1 #include "../test.h"
2 #include "rxcpp/operators/rx-with_latest_from.hpp"
3 
4 SCENARIO("with_latest_from interleaved with tail", "[with_latest_from][join][operators]"){
5     GIVEN("2 hot observables of ints."){
6         auto sc = rxsc::make_test();
7         auto w = sc.create_worker();
8         const rxsc::test::messages<int> on;
9 
10         auto o1 = sc.make_hot_observable({
11             on.next(150, 1),
12             on.next(215, 2),
13             on.next(225, 4),
14             on.completed(230)
15         });
16 
17         auto o2 = sc.make_hot_observable({
18             on.next(150, 1),
19             on.next(220, 3),
20             on.next(230, 5),
21             on.next(235, 6),
22             on.next(240, 7),
23             on.completed(250)
24         });
25 
26         WHEN("each int is combined with the latest from the other source"){
27 
28             auto res = w.start(
__anon9ee5cb450102() 29                 [&]() {
30                     return o2
31                         .with_latest_from(
32                             [](int v2, int v1){
33                                 return v2 + v1;
34                             },
35                             o1
36                         )
37                         // forget type to workaround lambda deduction bug on msvc 2013
38                         .as_dynamic();
39                 }
40             );
41 
42             THEN("the output contains combined ints"){
43                 auto required = rxu::to_vector({
44                     on.next(220, 2 + 3),
45                     on.next(230, 4 + 5),
46                     on.next(235, 4 + 6),
47                     on.next(240, 4 + 7),
48                     on.completed(250)
49                 });
50                 auto actual = res.get_observer().messages();
51                 REQUIRE(required == actual);
52             }
53 
54             THEN("there was one subscription and one unsubscription to the o1"){
55                 auto required = rxu::to_vector({
56                     on.subscribe(200, 230)
57                 });
58                 auto actual = o1.subscriptions();
59                 REQUIRE(required == actual);
60             }
61 
62             THEN("there was one subscription and one unsubscription to the o2"){
63                 auto required = rxu::to_vector({
64                     on.subscribe(200, 250)
65                 });
66                 auto actual = o2.subscriptions();
67                 REQUIRE(required == actual);
68             }
69         }
70     }
71 }
72 
73 SCENARIO("with_latest_from consecutive", "[with_latest_from][join][operators]"){
74     GIVEN("2 hot observables of ints."){
75         auto sc = rxsc::make_test();
76         auto w = sc.create_worker();
77         const rxsc::test::messages<int> on;
78 
79         auto o1 = sc.make_hot_observable({
80             on.next(150, 1),
81             on.next(215, 2),
82             on.next(225, 4),
83             on.completed(230)
84         });
85 
86         auto o2 = sc.make_hot_observable({
87             on.next(150, 1),
88             on.next(235, 6),
89             on.next(240, 7),
90             on.completed(250)
91         });
92 
93         WHEN("each int is combined with the latest from the other source"){
94 
95             auto res = w.start(
__anon9ee5cb450302() 96                 [&]() {
97                     return o2
98                         .with_latest_from(
99                             [](int v2, int v1){
100                                 return v2 + v1;
101                             },
102                             o1
103                         )
104                         // forget type to workaround lambda deduction bug on msvc 2013
105                         .as_dynamic();
106                 }
107             );
108 
109             THEN("the output contains combined ints"){
110                 auto required = rxu::to_vector({
111                     on.next(235, 4 + 6),
112                     on.next(240, 4 + 7),
113                     on.completed(250)
114                 });
115                 auto actual = res.get_observer().messages();
116                 REQUIRE(required == actual);
117             }
118 
119             THEN("there was one subscription and one unsubscription to the o1"){
120                 auto required = rxu::to_vector({
121                     on.subscribe(200, 230)
122                 });
123                 auto actual = o1.subscriptions();
124                 REQUIRE(required == actual);
125             }
126 
127             THEN("there was one subscription and one unsubscription to the o2"){
128                 auto required = rxu::to_vector({
129                     on.subscribe(200, 250)
130                 });
131                 auto actual = o2.subscriptions();
132                 REQUIRE(required == actual);
133             }
134         }
135     }
136 }
137 
138 SCENARIO("with_latest_from consecutive ends with error left", "[with_latest_from][join][operators]"){
139     GIVEN("2 hot observables of ints."){
140         auto sc = rxsc::make_test();
141         auto w = sc.create_worker();
142         const rxsc::test::messages<int> on;
143 
144         std::runtime_error ex("with_latest_from on_error from source");
145 
146         auto o1 = sc.make_hot_observable({
147             on.next(150, 1),
148             on.next(215, 2),
149             on.next(225, 4),
150             on.error(230, ex)
151         });
152 
153         auto o2 = sc.make_hot_observable({
154             on.next(150, 1),
155             on.next(235, 6),
156             on.next(240, 7),
157             on.completed(250)
158         });
159 
160         WHEN("each int is combined with the latest from the other source"){
161 
162             auto res = w.start(
__anon9ee5cb450502() 163                 [&]() {
164                     return o2
165                         .with_latest_from(
166                             [](int v2, int v1){
167                                 return v2 + v1;
168                             },
169                             o1
170                         )
171                         // forget type to workaround lambda deduction bug on msvc 2013
172                         .as_dynamic();
173                 }
174             );
175 
176             THEN("the output contains only an error"){
177                 auto required = rxu::to_vector({
178                     on.error(230, ex)
179                 });
180                 auto actual = res.get_observer().messages();
181                 REQUIRE(required == actual);
182             }
183 
184             THEN("there was one subscription and one unsubscription to the o1"){
185                 auto required = rxu::to_vector({
186                     on.subscribe(200, 230)
187                 });
188                 auto actual = o1.subscriptions();
189                 REQUIRE(required == actual);
190             }
191 
192             THEN("there was one subscription and one unsubscription to the o2"){
193                 auto required = rxu::to_vector({
194                     on.subscribe(200, 230)
195                 });
196                 auto actual = o2.subscriptions();
197                 REQUIRE(required == actual);
198             }
199         }
200     }
201 }
202 
203 SCENARIO("with_latest_from consecutive ends with error right", "[with_latest_from][join][operators]"){
204     GIVEN("2 hot observables of ints."){
205         auto sc = rxsc::make_test();
206         auto w = sc.create_worker();
207         const rxsc::test::messages<int> on;
208 
209         std::runtime_error ex("with_latest_from on_error from source");
210 
211         auto o1 = sc.make_hot_observable({
212             on.next(150, 1),
213             on.next(215, 2),
214             on.next(225, 4),
215             on.completed(250)
216         });
217 
218         auto o2 = sc.make_hot_observable({
219             on.next(150, 1),
220             on.next(235, 6),
221             on.next(240, 7),
222             on.error(245, ex)
223         });
224 
225         WHEN("each int is combined with the latest from the other source"){
226 
227             auto res = w.start(
__anon9ee5cb450702() 228                 [&]() {
229                     return o2
230                         .with_latest_from(
231                             [](int v2, int v1){
232                                 return v2 + v1;
233                             },
234                             o1
235                         )
236                         // forget type to workaround lambda deduction bug on msvc 2013
237                         .as_dynamic();
238                 }
239             );
240 
241             THEN("the output contains combined ints followed by an error"){
242                 auto required = rxu::to_vector({
243                     on.next(235, 4 + 6),
244                     on.next(240, 4 + 7),
245                     on.error(245, ex)
246                 });
247                 auto actual = res.get_observer().messages();
248                 REQUIRE(required == actual);
249             }
250 
251             THEN("there was one subscription and one unsubscription to the o1"){
252                 auto required = rxu::to_vector({
253                     on.subscribe(200, 245)
254                 });
255                 auto actual = o1.subscriptions();
256                 REQUIRE(required == actual);
257             }
258 
259             THEN("there was one subscription and one unsubscription to the o2"){
260                 auto required = rxu::to_vector({
261                     on.subscribe(200, 245)
262                 });
263                 auto actual = o2.subscriptions();
264                 REQUIRE(required == actual);
265             }
266         }
267     }
268 }
269 
270 SCENARIO("with_latest_from never N", "[with_latest_from][join][operators]"){
271     GIVEN("N never completed hot observables of ints."){
272         auto sc = rxsc::make_test();
273         auto w = sc.create_worker();
274         const rxsc::test::messages<int> on;
275 
276         const int N = 4;
277 
278         std::vector<rxcpp::test::testable_observable<int>> n;
279         for (int i = 0; i < N; ++i) {
280             n.push_back(
281                 sc.make_hot_observable({
282                     on.next(150, 1)
283                 })
284             );
285         }
286 
287         WHEN("each int is combined with the latest from the other source"){
288 
289             auto res = w.start(
__anon9ee5cb450902() 290                 [&]() {
291                     return n[0]
292                         .with_latest_from(
293                             [](int v0, int v1, int v2, int v3){
294                                 return v0 + v1 + v2 + v3;
295                             },
296                             n[1], n[2], n[3]
297                         )
298                         // forget type to workaround lambda deduction bug on msvc 2013
299                         .as_dynamic();
300                 }
301             );
302 
303             THEN("the output is empty"){
304                 auto required = std::vector<rxsc::test::messages<int>::recorded_type>();
305                 auto actual = res.get_observer().messages();
306                 REQUIRE(required == actual);
307             }
308 
309             THEN("there was one subscription and one unsubscription to each observable"){
310 
__anon9ee5cb450b02(rxcpp::test::testable_observable<int> &s)311                 std::for_each(n.begin(), n.end(), [&](rxcpp::test::testable_observable<int> &s){
312                     auto required = rxu::to_vector({
313                         on.subscribe(200, 1000)
314                     });
315                     auto actual = s.subscriptions();
316                     REQUIRE(required == actual);
317                 });
318             }
319         }
320     }
321 }
322 
323 SCENARIO("with_latest_from empty N", "[with_latest_from][join][operators]"){
324     GIVEN("N empty hot observables of ints."){
325         auto sc = rxsc::make_test();
326         auto w = sc.create_worker();
327         const rxsc::test::messages<int> on;
328 
329         const int N = 4;
330 
331         std::vector<rxcpp::test::testable_observable<int>> e;
332         for (int i = 0; i < N; ++i) {
333             e.push_back(
334                 sc.make_hot_observable({
335                     on.next(150, 1),
336                     on.completed(210 + 10 * i)
337                 })
338             );
339         }
340 
341         WHEN("each int is combined with the latest from the other source"){
342 
343             auto res = w.start(
__anon9ee5cb450c02() 344                 [&]() {
345                     return e[0]
346                         .with_latest_from(
347                             [](int v0, int v1, int v2, int v3){
348                                 return v0 + v1 + v2 + v3;
349                             },
350                             e[1], e[2], e[3]
351                         )
352                         // forget type to workaround lambda deduction bug on msvc 2013
353                         .as_dynamic();
354                 }
355             );
356 
357             THEN("the output contains only complete message"){
358                 auto required = rxu::to_vector({
359                     on.completed(200 + 10 * N)
360                 });
361                 auto actual = res.get_observer().messages();
362                 REQUIRE(required == actual);
363             }
364 
365             THEN("there was one subscription and one unsubscription to each observable"){
366 
367                 int i = 0;
__anon9ee5cb450e02(rxcpp::test::testable_observable<int> &s)368                 std::for_each(e.begin(), e.end(), [&](rxcpp::test::testable_observable<int> &s){
369                     auto required = rxu::to_vector({
370                         on.subscribe(200, 200 + 10 * ++i)
371                     });
372                     auto actual = s.subscriptions();
373                     REQUIRE(required == actual);
374                 });
375             }
376         }
377     }
378 }
379 
380 SCENARIO("with_latest_from never/empty", "[with_latest_from][join][operators]"){
381     GIVEN("2 hot observables of ints."){
382         auto sc = rxsc::make_test();
383         auto w = sc.create_worker();
384         const rxsc::test::messages<int> on;
385 
386         auto n = sc.make_hot_observable({
387             on.next(150, 1)
388         });
389 
390         auto e = sc.make_hot_observable({
391             on.next(150, 1),
392             on.completed(210)
393         });
394 
395         WHEN("each int is combined with the latest from the other source"){
396 
397             auto res = w.start(
__anon9ee5cb450f02() 398                 [&]() {
399                     return n
400                         .with_latest_from(
401                             [](int v2, int v1){
402                                 return v2 + v1;
403                             },
404                             e
405                         )
406                         // forget type to workaround lambda deduction bug on msvc 2013
407                         .as_dynamic();
408                 }
409             );
410 
411             THEN("the output is empty"){
412                 auto required = std::vector<rxsc::test::messages<int>::recorded_type>();
413                 auto actual = res.get_observer().messages();
414                 REQUIRE(required == actual);
415             }
416 
417             THEN("there was one subscription and one unsubscription to the n"){
418                 auto required = rxu::to_vector({
419                     on.subscribe(200, 1000)
420                 });
421                 auto actual = n.subscriptions();
422                 REQUIRE(required == actual);
423             }
424 
425             THEN("there was one subscription and one unsubscription to the e"){
426                 auto required = rxu::to_vector({
427                     on.subscribe(200, 210)
428                 });
429                 auto actual = e.subscriptions();
430                 REQUIRE(required == actual);
431             }
432         }
433     }
434 }
435 
436 SCENARIO("with_latest_from empty/never", "[with_latest_from][join][operators]"){
437     GIVEN("2 hot observables of ints."){
438         auto sc = rxsc::make_test();
439         auto w = sc.create_worker();
440         const rxsc::test::messages<int> on;
441 
442         auto e = sc.make_hot_observable({
443             on.next(150, 1),
444             on.completed(210)
445         });
446 
447         auto n = sc.make_hot_observable({
448             on.next(150, 1)
449         });
450 
451         WHEN("each int is combined with the latest from the other source"){
452 
453             auto res = w.start(
__anon9ee5cb451102() 454                 [&]() {
455                     return e
456                         .with_latest_from(
457                             [](int v2, int v1){
458                                 return v2 + v1;
459                             },
460                             n
461                         )
462                         // forget type to workaround lambda deduction bug on msvc 2013
463                         .as_dynamic();
464                 }
465             );
466 
467             THEN("the output is empty"){
468                 auto required = std::vector<rxsc::test::messages<int>::recorded_type>();
469                 auto actual = res.get_observer().messages();
470                 REQUIRE(required == actual);
471             }
472 
473             THEN("there was one subscription and one unsubscription to the e"){
474                 auto required = rxu::to_vector({
475                     on.subscribe(200, 210)
476                 });
477                 auto actual = e.subscriptions();
478                 REQUIRE(required == actual);
479             }
480 
481             THEN("there was one subscription and one unsubscription to the n"){
482                 auto required = rxu::to_vector({
483                     on.subscribe(200, 1000)
484                 });
485                 auto actual = n.subscriptions();
486                 REQUIRE(required == actual);
487             }
488         }
489     }
490 }
491 
492 SCENARIO("with_latest_from empty/return", "[with_latest_from][join][operators]"){
493     GIVEN("2 hot observables of ints."){
494         auto sc = rxsc::make_test();
495         auto w = sc.create_worker();
496         const rxsc::test::messages<int> on;
497 
498         auto e = sc.make_hot_observable({
499             on.next(150, 1),
500             on.completed(210)
501         });
502 
503         auto o = sc.make_hot_observable({
504             on.next(150, 1),
505             on.next(215, 2),
506             on.completed(220)
507         });
508 
509         WHEN("each int is combined with the latest from the other source"){
510 
511             auto res = w.start(
__anon9ee5cb451302() 512                 [&]() {
513                     return e
514                         .with_latest_from(
515                             [](int v2, int v1){
516                                 return v2 + v1;
517                             },
518                             o
519                         )
520                         // forget type to workaround lambda deduction bug on msvc 2013
521                         .as_dynamic();
522                 }
523             );
524 
525             THEN("the output contains only complete message"){
526                 auto required = rxu::to_vector({
527                     on.completed(220)
528                 });
529                 auto actual = res.get_observer().messages();
530                 REQUIRE(required == actual);
531             }
532 
533             THEN("there was one subscription and one unsubscription to the e"){
534                 auto required = rxu::to_vector({
535                     on.subscribe(200, 210)
536                 });
537                 auto actual = e.subscriptions();
538                 REQUIRE(required == actual);
539             }
540 
541             THEN("there was one subscription and one unsubscription to the o"){
542                 auto required = rxu::to_vector({
543                     on.subscribe(200, 220)
544                 });
545                 auto actual = o.subscriptions();
546                 REQUIRE(required == actual);
547             }
548         }
549     }
550 }
551 
552 SCENARIO("with_latest_from return/empty", "[with_latest_from][join][operators]"){
553     GIVEN("2 hot observables of ints."){
554         auto sc = rxsc::make_test();
555         auto w = sc.create_worker();
556         const rxsc::test::messages<int> on;
557 
558         auto o = sc.make_hot_observable({
559             on.next(150, 1),
560             on.next(215, 2),
561             on.completed(220)
562         });
563 
564         auto e = sc.make_hot_observable({
565             on.next(150, 1),
566             on.completed(210)
567         });
568 
569         WHEN("each int is combined with the latest from the other source"){
570 
571             auto res = w.start(
__anon9ee5cb451502() 572                 [&]() {
573                     return o
574                         .with_latest_from(
575                             [](int v2, int v1){
576                                 return v2 + v1;
577                             },
578                             e
579                         )
580                         // forget type to workaround lambda deduction bug on msvc 2013
581                         .as_dynamic();
582                 }
583             );
584 
585             THEN("the output contains only complete message"){
586                 auto required = rxu::to_vector({
587                     on.completed(220)
588                 });
589                 auto actual = res.get_observer().messages();
590                 REQUIRE(required == actual);
591             }
592 
593             THEN("there was one subscription and one unsubscription to the o"){
594                 auto required = rxu::to_vector({
595                     on.subscribe(200, 220)
596                 });
597                 auto actual = o.subscriptions();
598                 REQUIRE(required == actual);
599             }
600 
601             THEN("there was one subscription and one unsubscription to the e"){
602                 auto required = rxu::to_vector({
603                     on.subscribe(200, 210)
604                 });
605                 auto actual = e.subscriptions();
606                 REQUIRE(required == actual);
607             }
608         }
609     }
610 }
611 
612 SCENARIO("with_latest_from never/return", "[with_latest_from][join][operators]"){
613     GIVEN("2 hot observables of ints."){
614         auto sc = rxsc::make_test();
615         auto w = sc.create_worker();
616         const rxsc::test::messages<int> on;
617 
618         auto n = sc.make_hot_observable({
619             on.next(150, 1)
620         });
621 
622         auto o = sc.make_hot_observable({
623             on.next(150, 1),
624             on.next(215, 2),
625             on.completed(220)
626         });
627 
628         WHEN("each int is combined with the latest from the other source"){
629 
630             auto res = w.start(
__anon9ee5cb451702() 631                 [&]() {
632                     return n
633                         .with_latest_from(
634                             [](int v2, int v1){
635                                 return v2 + v1;
636                             },
637                             o
638                         )
639                         // forget type to workaround lambda deduction bug on msvc 2013
640                         .as_dynamic();
641                 }
642             );
643 
644             THEN("the output is empty"){
645                 auto required = std::vector<rxsc::test::messages<int>::recorded_type>();
646                 auto actual = res.get_observer().messages();
647                 REQUIRE(required == actual);
648             }
649 
650             THEN("there was one subscription and one unsubscription to the n"){
651                 auto required = rxu::to_vector({
652                     on.subscribe(200, 1000)
653                 });
654                 auto actual = n.subscriptions();
655                 REQUIRE(required == actual);
656             }
657 
658             THEN("there was one subscription and one unsubscription to the o"){
659                 auto required = rxu::to_vector({
660                     on.subscribe(200, 220)
661                 });
662                 auto actual = o.subscriptions();
663                 REQUIRE(required == actual);
664             }
665         }
666     }
667 }
668 
669 SCENARIO("with_latest_from return/never", "[with_latest_from][join][operators]"){
670     GIVEN("2 hot observables of ints."){
671         auto sc = rxsc::make_test();
672         auto w = sc.create_worker();
673         const rxsc::test::messages<int> on;
674 
675         auto o = sc.make_hot_observable({
676             on.next(150, 1),
677             on.next(215, 2),
678             on.completed(220)
679         });
680 
681         auto n = sc.make_hot_observable({
682             on.next(150, 1)
683         });
684 
685         WHEN("each int is combined with the latest from the other source"){
686 
687             auto res = w.start(
__anon9ee5cb451902() 688                 [&]() {
689                     return o
690                         .with_latest_from(
691                             [](int v2, int v1){
692                                 return v2 + v1;
693                             },
694                             n
695                         )
696                         // forget type to workaround lambda deduction bug on msvc 2013
697                         .as_dynamic();
698                 }
699             );
700 
701             THEN("the output is empty"){
702                 auto required = std::vector<rxsc::test::messages<int>::recorded_type>();
703                 auto actual = res.get_observer().messages();
704                 REQUIRE(required == actual);
705             }
706 
707             THEN("there was one subscription and one unsubscription to the n"){
708                 auto required = rxu::to_vector({
709                     on.subscribe(200, 1000)
710                 });
711                 auto actual = n.subscriptions();
712                 REQUIRE(required == actual);
713             }
714 
715             THEN("there was one subscription and one unsubscription to the o"){
716                 auto required = rxu::to_vector({
717                     on.subscribe(200, 220)
718                 });
719                 auto actual = o.subscriptions();
720                 REQUIRE(required == actual);
721             }
722         }
723     }
724 }
725 
726 
727 SCENARIO("with_latest_from return/return", "[with_latest_from][join][operators]"){
728     GIVEN("2 hot observables of ints."){
729         auto sc = rxsc::make_test();
730         auto w = sc.create_worker();
731         const rxsc::test::messages<int> on;
732 
733         auto o1 = sc.make_hot_observable({
734             on.next(150, 1),
735             on.next(215, 2),
736             on.completed(230)
737         });
738 
739         auto o2 = sc.make_hot_observable({
740             on.next(150, 1),
741             on.next(220, 3),
742             on.completed(240)
743         });
744 
745         WHEN("each int is combined with the latest from the other source"){
746 
747             auto res = w.start(
__anon9ee5cb451b02() 748                 [&]() {
749                     return o1
750                         .with_latest_from(
751                             [](int v2, int v1){
752                              return v2 + v1;
753                             },
754                             o2
755                         )
756                         // forget type to workaround lambda deduction bug on msvc 2013
757                         .as_dynamic();
758                 }
759             );
760 
761             THEN("the output contains combined ints"){
762                 auto required = rxu::to_vector({
763                     on.completed(240)
764                 });
765                 auto actual = res.get_observer().messages();
766                 REQUIRE(required == actual);
767             }
768 
769             THEN("there was one subscription and one unsubscription to the o1"){
770                 auto required = rxu::to_vector({
771                     on.subscribe(200, 230)
772                 });
773                 auto actual = o1.subscriptions();
774                 REQUIRE(required == actual);
775             }
776 
777             THEN("there was one subscription and one unsubscription to the o2"){
778                 auto required = rxu::to_vector({
779                     on.subscribe(200, 240)
780                 });
781                 auto actual = o2.subscriptions();
782                 REQUIRE(required == actual);
783             }
784         }
785     }
786 }
787 
788 SCENARIO("with_latest_from empty/error", "[with_latest_from][join][operators]"){
789     GIVEN("2 hot observables of ints."){
790         auto sc = rxsc::make_test();
791         auto w = sc.create_worker();
792         const rxsc::test::messages<int> on;
793 
794         std::runtime_error ex("with_latest_from on_error from source");
795 
796         auto emp = sc.make_hot_observable({
797             on.next(150, 1),
798             on.completed(230)
799         });
800 
801         auto err = sc.make_hot_observable({
802             on.next(150, 1),
803             on.error(220, ex)
804         });
805 
806         WHEN("each int is combined with the latest from the other source"){
807 
808             auto res = w.start(
__anon9ee5cb451d02() 809                 [&]() {
810                     return emp
811                         .with_latest_from(
812                             [](int v2, int v1){
813                                 return v2 + v1;
814                             },
815                             err
816                         )
817                         // forget type to workaround lambda deduction bug on msvc 2013
818                         .as_dynamic();
819                 }
820             );
821 
822             THEN("the output contains only error message"){
823                 auto required = rxu::to_vector({
824                     on.error(220, ex)
825                 });
826                 auto actual = res.get_observer().messages();
827                 REQUIRE(required == actual);
828             }
829 
830             THEN("there was one subscription and one unsubscription to the emp"){
831                 auto required = rxu::to_vector({
832                     on.subscribe(200, 220)
833                 });
834                 auto actual = emp.subscriptions();
835                 REQUIRE(required == actual);
836             }
837 
838             THEN("there was one subscription and one unsubscription to the err"){
839                 auto required = rxu::to_vector({
840                     on.subscribe(200, 220)
841                 });
842                 auto actual = err.subscriptions();
843                 REQUIRE(required == actual);
844             }
845         }
846     }
847 }
848 
849 SCENARIO("with_latest_from error/empty", "[with_latest_from][join][operators]"){
850     GIVEN("2 hot observables of ints."){
851         auto sc = rxsc::make_test();
852         auto w = sc.create_worker();
853         const rxsc::test::messages<int> on;
854 
855         std::runtime_error ex("with_latest_from on_error from source");
856 
857         auto err = sc.make_hot_observable({
858             on.next(150, 1),
859             on.error(220, ex)
860         });
861 
862         auto emp = sc.make_hot_observable({
863             on.next(150, 1),
864             on.completed(230)
865         });
866 
867         WHEN("each int is combined with the latest from the other source"){
868 
869             auto res = w.start(
__anon9ee5cb451f02() 870                 [&]() {
871                     return err
872                         .with_latest_from(
873                             [](int v2, int v1){
874                                 return v2 + v1;
875                             },
876                             emp
877                         )
878                         // forget type to workaround lambda deduction bug on msvc 2013
879                         .as_dynamic();
880                 }
881             );
882 
883             THEN("the output contains only error message"){
884                 auto required = rxu::to_vector({
885                     on.error(220, ex)
886                 });
887                 auto actual = res.get_observer().messages();
888                 REQUIRE(required == actual);
889             }
890 
891             THEN("there was one subscription and one unsubscription to the emp"){
892                 auto required = rxu::to_vector({
893                     on.subscribe(200, 220)
894                 });
895                 auto actual = emp.subscriptions();
896                 REQUIRE(required == actual);
897             }
898 
899             THEN("there was one subscription and one unsubscription to the err"){
900                 auto required = rxu::to_vector({
901                     on.subscribe(200, 220)
902                 });
903                 auto actual = err.subscriptions();
904                 REQUIRE(required == actual);
905             }
906         }
907     }
908 }
909 
910 SCENARIO("with_latest_from return/error", "[with_latest_from][join][operators]"){
911     GIVEN("2 hot observables of ints."){
912         auto sc = rxsc::make_test();
913         auto w = sc.create_worker();
914         const rxsc::test::messages<int> on;
915 
916         std::runtime_error ex("with_latest_from on_error from source");
917 
918         auto o = sc.make_hot_observable({
919             on.next(150, 1),
920             on.next(210, 2),
921             on.completed(230)
922         });
923 
924         auto err = sc.make_hot_observable({
925             on.next(150, 1),
926             on.error(220, ex)
927         });
928 
929         WHEN("each int is combined with the latest from the other source"){
930 
931             auto res = w.start(
__anon9ee5cb452102() 932                 [&]() {
933                     return o
934                         .with_latest_from(
935                             [](int v2, int v1){
936                                 return v2 + v1;
937                             },
938                             err
939                         )
940                         // forget type to workaround lambda deduction bug on msvc 2013
941                         .as_dynamic();
942                 }
943             );
944 
945             THEN("the output contains only error message"){
946                 auto required = rxu::to_vector({
947                     on.error(220, ex)
948                 });
949                 auto actual = res.get_observer().messages();
950                 REQUIRE(required == actual);
951             }
952 
953             THEN("there was one subscription and one unsubscription to the ret"){
954                 auto required = rxu::to_vector({
955                     on.subscribe(200, 220)
956                 });
957                 auto actual = o.subscriptions();
958                 REQUIRE(required == actual);
959             }
960 
961             THEN("there was one subscription and one unsubscription to the err"){
962                 auto required = rxu::to_vector({
963                     on.subscribe(200, 220)
964                 });
965                 auto actual = err.subscriptions();
966                 REQUIRE(required == actual);
967             }
968         }
969     }
970 }
971 
972 SCENARIO("with_latest_from error/return", "[with_latest_from][join][operators]"){
973     GIVEN("2 hot observables of ints."){
974         auto sc = rxsc::make_test();
975         auto w = sc.create_worker();
976         const rxsc::test::messages<int> on;
977 
978         std::runtime_error ex("with_latest_from on_error from source");
979 
980         auto err = sc.make_hot_observable({
981             on.next(150, 1),
982             on.error(220, ex)
983         });
984 
985         auto ret = sc.make_hot_observable({
986             on.next(150, 1),
987             on.next(210, 2),
988             on.completed(230)
989         });
990 
991         WHEN("each int is combined with the latest from the other source"){
992 
993             auto res = w.start(
__anon9ee5cb452302() 994                 [&]() {
995                     return err
996                         .with_latest_from(
997                             [](int v2, int v1){
998                                 return v2 + v1;
999                             },
1000                             ret
1001                         )
1002                         // forget type to workaround lambda deduction bug on msvc 2013
1003                         .as_dynamic();
1004                 }
1005             );
1006 
1007             THEN("the output contains only error message"){
1008                 auto required = rxu::to_vector({
1009                     on.error(220, ex)
1010                 });
1011                 auto actual = res.get_observer().messages();
1012                 REQUIRE(required == actual);
1013             }
1014 
1015             THEN("there was one subscription and one unsubscription to the ret"){
1016                 auto required = rxu::to_vector({
1017                     on.subscribe(200, 220)
1018                 });
1019                 auto actual = ret.subscriptions();
1020                 REQUIRE(required == actual);
1021             }
1022 
1023             THEN("there was one subscription and one unsubscription to the err"){
1024                 auto required = rxu::to_vector({
1025                     on.subscribe(200, 220)
1026                 });
1027                 auto actual = err.subscriptions();
1028                 REQUIRE(required == actual);
1029             }
1030         }
1031     }
1032 }
1033 
1034 SCENARIO("with_latest_from error/error", "[with_latest_from][join][operators]"){
1035     GIVEN("2 hot observables of ints."){
1036         auto sc = rxsc::make_test();
1037         auto w = sc.create_worker();
1038         const rxsc::test::messages<int> on;
1039 
1040         std::runtime_error ex1("with_latest_from on_error from source 1");
1041         std::runtime_error ex2("with_latest_from on_error from source 2");
1042 
1043         auto err1 = sc.make_hot_observable({
1044             on.next(150, 1),
1045             on.error(220, ex1)
1046         });
1047 
1048         auto err2 = sc.make_hot_observable({
1049             on.next(150, 1),
1050             on.error(230, ex2)
1051         });
1052 
1053         WHEN("each int is combined with the latest from the other source"){
1054 
1055             auto res = w.start(
__anon9ee5cb452502() 1056                 [&]() {
1057                     return err1
1058                         .with_latest_from(
1059                             [](int v2, int v1){
1060                                 return v2 + v1;
1061                             },
1062                             err2
1063                         )
1064                         // forget type to workaround lambda deduction bug on msvc 2013
1065                         .as_dynamic();
1066                 }
1067             );
1068 
1069             THEN("the output contains only error message"){
1070                 auto required = rxu::to_vector({
1071                     on.error(220, ex1)
1072                 });
1073                 auto actual = res.get_observer().messages();
1074                 REQUIRE(required == actual);
1075             }
1076 
1077             THEN("there was one subscription and one unsubscription to the err1"){
1078                 auto required = rxu::to_vector({
1079                     on.subscribe(200, 220)
1080                 });
1081                 auto actual = err1.subscriptions();
1082                 REQUIRE(required == actual);
1083             }
1084 
1085             THEN("there was one subscription and one unsubscription to the err2"){
1086                 auto required = rxu::to_vector({
1087                     on.subscribe(200, 220)
1088                 });
1089                 auto actual = err2.subscriptions();
1090                 REQUIRE(required == actual);
1091             }
1092         }
1093     }
1094 }
1095 
1096 SCENARIO("with_latest_from next+error/error", "[with_latest_from][join][operators]"){
1097     GIVEN("2 hot observables of ints."){
1098         auto sc = rxsc::make_test();
1099         auto w = sc.create_worker();
1100         const rxsc::test::messages<int> on;
1101 
1102         std::runtime_error ex1("with_latest_from on_error from source 1");
1103         std::runtime_error ex2("with_latest_from on_error from source 2");
1104 
1105         auto err1 = sc.make_hot_observable({
1106             on.next(150, 1),
1107             on.next(210, 2),
1108             on.error(220, ex1)
1109         });
1110 
1111         auto err2 = sc.make_hot_observable({
1112             on.next(150, 1),
1113             on.error(230, ex2)
1114         });
1115 
1116         WHEN("each int is combined with the latest from the other source"){
1117 
1118             auto res = w.start(
__anon9ee5cb452702() 1119                 [&]() {
1120                     return err1
1121                         .with_latest_from(
1122                             [](int v2, int v1){
1123                                 return v2 + v1;
1124                             },
1125                             err2
1126                         )
1127                         // forget type to workaround lambda deduction bug on msvc 2013
1128                         .as_dynamic();
1129                 }
1130             );
1131 
1132             THEN("the output contains only error message"){
1133                 auto required = rxu::to_vector({
1134                     on.error(220, ex1)
1135                 });
1136                 auto actual = res.get_observer().messages();
1137                 REQUIRE(required == actual);
1138             }
1139 
1140             THEN("there was one subscription and one unsubscription to the err1"){
1141                 auto required = rxu::to_vector({
1142                     on.subscribe(200, 220)
1143                 });
1144                 auto actual = err1.subscriptions();
1145                 REQUIRE(required == actual);
1146             }
1147 
1148             THEN("there was one subscription and one unsubscription to the err2"){
1149                 auto required = rxu::to_vector({
1150                     on.subscribe(200, 220)
1151                 });
1152                 auto actual = err2.subscriptions();
1153                 REQUIRE(required == actual);
1154             }
1155         }
1156     }
1157 }
1158 
1159 SCENARIO("with_latest_from error/next+error", "[with_latest_from][join][operators]"){
1160     GIVEN("2 hot observables of ints."){
1161         auto sc = rxsc::make_test();
1162         auto w = sc.create_worker();
1163         const rxsc::test::messages<int> on;
1164 
1165         std::runtime_error ex1("with_latest_from on_error from source 1");
1166         std::runtime_error ex2("with_latest_from on_error from source 2");
1167 
1168         auto err1 = sc.make_hot_observable({
1169             on.next(150, 1),
1170             on.error(230, ex1)
1171         });
1172 
1173         auto err2 = sc.make_hot_observable({
1174             on.next(150, 1),
1175             on.next(210, 2),
1176             on.error(220, ex2)
1177         });
1178 
1179         WHEN("each int is combined with the latest from the other source"){
1180 
1181             auto res = w.start(
__anon9ee5cb452902() 1182                 [&]() {
1183                     return err1
1184                         .with_latest_from(
1185                             [](int v2, int v1){
1186                                 return v2 + v1;
1187                             },
1188                             err2
1189                         )
1190                         // forget type to workaround lambda deduction bug on msvc 2013
1191                         .as_dynamic();
1192                 }
1193             );
1194 
1195             THEN("the output contains only error message"){
1196                 auto required = rxu::to_vector({
1197                     on.error(220, ex2)
1198                 });
1199                 auto actual = res.get_observer().messages();
1200                 REQUIRE(required == actual);
1201             }
1202 
1203             THEN("there was one subscription and one unsubscription to the err1"){
1204                 auto required = rxu::to_vector({
1205                     on.subscribe(200, 220)
1206                 });
1207                 auto actual = err1.subscriptions();
1208                 REQUIRE(required == actual);
1209             }
1210 
1211             THEN("there was one subscription and one unsubscription to the err2"){
1212                 auto required = rxu::to_vector({
1213                     on.subscribe(200, 220)
1214                 });
1215                 auto actual = err2.subscriptions();
1216                 REQUIRE(required == actual);
1217             }
1218         }
1219     }
1220 }
1221 
1222 SCENARIO("with_latest_from never/error", "[with_latest_from][join][operators]"){
1223     GIVEN("2 hot observables of ints."){
1224         auto sc = rxsc::make_test();
1225         auto w = sc.create_worker();
1226         const rxsc::test::messages<int> on;
1227 
1228         std::runtime_error ex("with_latest_from on_error from source");
1229 
1230         auto n = sc.make_hot_observable({
1231             on.next(150, 1)
1232         });
1233 
1234         auto err = sc.make_hot_observable({
1235             on.next(150, 1),
1236             on.error(220, ex)
1237         });
1238 
1239         WHEN("each int is combined with the latest from the other source"){
1240 
1241             auto res = w.start(
__anon9ee5cb452b02() 1242                 [&]() {
1243                     return n
1244                         .with_latest_from(
1245                             [](int v2, int v1){
1246                                 return v2 + v1;
1247                             },
1248                             err
1249                         )
1250                         // forget type to workaround lambda deduction bug on msvc 2013
1251                         .as_dynamic();
1252                 }
1253             );
1254 
1255             THEN("the output contains only error message"){
1256                 auto required = rxu::to_vector({
1257                     on.error(220, ex)
1258                 });
1259                 auto actual = res.get_observer().messages();
1260                 REQUIRE(required == actual);
1261             }
1262 
1263             THEN("there was one subscription and one unsubscription to the n"){
1264                 auto required = rxu::to_vector({
1265                     on.subscribe(200, 220)
1266                 });
1267                 auto actual = n.subscriptions();
1268                 REQUIRE(required == actual);
1269             }
1270 
1271             THEN("there was one subscription and one unsubscription to the err"){
1272                 auto required = rxu::to_vector({
1273                     on.subscribe(200, 220)
1274                 });
1275                 auto actual = err.subscriptions();
1276                 REQUIRE(required == actual);
1277             }
1278         }
1279     }
1280 }
1281 
1282 SCENARIO("with_latest_from error/never", "[with_latest_from][join][operators]"){
1283     GIVEN("2 hot observables of ints."){
1284         auto sc = rxsc::make_test();
1285         auto w = sc.create_worker();
1286         const rxsc::test::messages<int> on;
1287 
1288         std::runtime_error ex("with_latest_from on_error from source");
1289 
1290         auto err = sc.make_hot_observable({
1291             on.next(150, 1),
1292             on.error(220, ex)
1293         });
1294 
1295         auto n = sc.make_hot_observable({
1296             on.next(150, 1)
1297         });
1298 
1299         WHEN("each int is combined with the latest from the other source"){
1300 
1301             auto res = w.start(
__anon9ee5cb452d02() 1302                 [&]() {
1303                     return err
1304                         .with_latest_from(
1305                             [](int v2, int v1){
1306                                 return v2 + v1;
1307                             },
1308                             n
1309                         )
1310                         // forget type to workaround lambda deduction bug on msvc 2013
1311                         .as_dynamic();
1312                 }
1313             );
1314 
1315             THEN("the output contains only error message"){
1316                 auto required = rxu::to_vector({
1317                     on.error(220, ex)
1318                 });
1319                 auto actual = res.get_observer().messages();
1320                 REQUIRE(required == actual);
1321             }
1322 
1323             THEN("there was one subscription and one unsubscription to the n"){
1324                 auto required = rxu::to_vector({
1325                     on.subscribe(200, 220)
1326                 });
1327                 auto actual = n.subscriptions();
1328                 REQUIRE(required == actual);
1329             }
1330 
1331             THEN("there was one subscription and one unsubscription to the err"){
1332                 auto required = rxu::to_vector({
1333                     on.subscribe(200, 220)
1334                 });
1335                 auto actual = err.subscriptions();
1336                 REQUIRE(required == actual);
1337             }
1338         }
1339     }
1340 }
1341 
1342 SCENARIO("with_latest_from error after completed left", "[with_latest_from][join][operators]"){
1343     GIVEN("2 hot observables of ints."){
1344         auto sc = rxsc::make_test();
1345         auto w = sc.create_worker();
1346         const rxsc::test::messages<int> on;
1347 
1348         std::runtime_error ex("with_latest_from on_error from source");
1349 
1350         auto ret = sc.make_hot_observable({
1351             on.next(150, 1),
1352             on.next(210, 2),
1353             on.completed(215)
1354         });
1355 
1356         auto err = sc.make_hot_observable({
1357             on.next(150, 1),
1358             on.error(220, ex)
1359         });
1360 
1361         WHEN("each int is combined with the latest from the other source"){
1362 
1363             auto res = w.start(
__anon9ee5cb452f02() 1364                 [&]() {
1365                     return ret
1366                         .with_latest_from(
1367                             [](int v2, int v1){
1368                                 return v2 + v1;
1369                             },
1370                             err
1371                         )
1372                         // forget type to workaround lambda deduction bug on msvc 2013
1373                         .as_dynamic();
1374                 }
1375             );
1376 
1377             THEN("the output contains only error message"){
1378                 auto required = rxu::to_vector({
1379                     on.error(220, ex)
1380                 });
1381                 auto actual = res.get_observer().messages();
1382                 REQUIRE(required == actual);
1383             }
1384 
1385             THEN("there was one subscription and one unsubscription to the ret"){
1386                 auto required = rxu::to_vector({
1387                     on.subscribe(200, 215)
1388                 });
1389                 auto actual = ret.subscriptions();
1390                 REQUIRE(required == actual);
1391             }
1392 
1393             THEN("there was one subscription and one unsubscription to the err"){
1394                 auto required = rxu::to_vector({
1395                     on.subscribe(200, 220)
1396                 });
1397                 auto actual = err.subscriptions();
1398                 REQUIRE(required == actual);
1399             }
1400         }
1401     }
1402 }
1403 
1404 SCENARIO("with_latest_from error after completed right", "[with_latest_from][join][operators]"){
1405     GIVEN("2 hot observables of ints."){
1406         auto sc = rxsc::make_test();
1407         auto w = sc.create_worker();
1408         const rxsc::test::messages<int> on;
1409 
1410         std::runtime_error ex("with_latest_from on_error from source");
1411 
1412         auto err = sc.make_hot_observable({
1413             on.next(150, 1),
1414             on.error(220, ex)
1415         });
1416 
1417         auto ret = sc.make_hot_observable({
1418             on.next(150, 1),
1419             on.next(210, 2),
1420             on.completed(215)
1421         });
1422 
1423         WHEN("each int is combined with the latest from the other source"){
1424 
1425             auto res = w.start(
__anon9ee5cb453102() 1426                 [&]() {
1427                     return err
1428                         .with_latest_from(
1429                             [](int v2, int v1){
1430                                 return v2 + v1;
1431                             },
1432                             ret
1433                         )
1434                         // forget type to workaround lambda deduction bug on msvc 2013
1435                         .as_dynamic();
1436                 }
1437             );
1438 
1439             THEN("the output contains only error message"){
1440                 auto required = rxu::to_vector({
1441                     on.error(220, ex)
1442                 });
1443                 auto actual = res.get_observer().messages();
1444                 REQUIRE(required == actual);
1445             }
1446 
1447             THEN("there was one subscription and one unsubscription to the ret"){
1448                 auto required = rxu::to_vector({
1449                     on.subscribe(200, 215)
1450                 });
1451                 auto actual = ret.subscriptions();
1452                 REQUIRE(required == actual);
1453             }
1454 
1455             THEN("there was one subscription and one unsubscription to the err"){
1456                 auto required = rxu::to_vector({
1457                     on.subscribe(200, 220)
1458                 });
1459                 auto actual = err.subscriptions();
1460                 REQUIRE(required == actual);
1461             }
1462         }
1463     }
1464 }
1465 
1466 SCENARIO("with_latest_from selector throws", "[with_latest_from][join][operators][!throws]"){
1467     GIVEN("2 hot observables of ints."){
1468         auto sc = rxsc::make_test();
1469         auto w = sc.create_worker();
1470         const rxsc::test::messages<int> on;
1471 
1472         std::runtime_error ex("with_latest_from on_error from source");
1473 
1474         auto o1 = sc.make_hot_observable({
1475             on.next(150, 1),
1476             on.next(215, 2),
1477             on.completed(230)
1478         });
1479 
1480         auto o2 = sc.make_hot_observable({
1481             on.next(150, 1),
1482             on.next(220, 3),
1483             on.completed(240)
1484         });
1485 
1486         WHEN("each int is combined with the latest from the other source"){
1487 
1488             auto res = w.start(
__anon9ee5cb453302() 1489                 [&]() {
1490                     return o2
1491                         .with_latest_from(
1492                             [&ex](int, int) -> int {
1493                                 rxu::throw_exception(ex);
1494                             },
1495                             o1
1496                         )
1497                         // forget type to workaround lambda deduction bug on msvc 2013
1498                         .as_dynamic();
1499                 }
1500             );
1501 
1502             THEN("the output contains only error"){
1503                 auto required = rxu::to_vector({
1504                     on.error(220, ex)
1505                 });
1506                 auto actual = res.get_observer().messages();
1507                 REQUIRE(required == actual);
1508             }
1509 
1510             THEN("there was one subscription and one unsubscription to the o1"){
1511                 auto required = rxu::to_vector({
1512                     on.subscribe(200, 220)
1513                 });
1514                 auto actual = o1.subscriptions();
1515                 REQUIRE(required == actual);
1516             }
1517 
1518             THEN("there was one subscription and one unsubscription to the o2"){
1519                 auto required = rxu::to_vector({
1520                     on.subscribe(200, 220)
1521                 });
1522                 auto actual = o2.subscriptions();
1523                 REQUIRE(required == actual);
1524             }
1525         }
1526     }
1527 }
1528 
1529 SCENARIO("with_latest_from selector throws N", "[with_latest_from][join][operators][!throws]"){
1530     GIVEN("N hot observables of ints."){
1531         auto sc = rxsc::make_test();
1532         auto w = sc.create_worker();
1533         const rxsc::test::messages<int> on;
1534 
1535         const int N = 4;
1536 
1537         std::runtime_error ex("with_latest_from on_error from source");
1538 
1539         std::vector<rxcpp::test::testable_observable<int>> e;
1540         for (int i = 0; i < N; ++i) {
1541             e.push_back(
1542                 sc.make_hot_observable({
1543                     on.next(210 + 10 * i, 1),
1544                     on.completed(500)
1545                 })
1546             );
1547         }
1548 
1549         WHEN("each int is combined with the latest from the other source"){
1550 
1551             auto res = w.start(
__anon9ee5cb453502() 1552                 [&]() {
1553                     return e[3]
1554                         .with_latest_from(
1555                             [&ex](int, int, int, int) -> int {
1556                                 rxu::throw_exception(ex);
1557                             },
1558                             e[0], e[1], e[2]
1559                         )
1560                         // forget type to workaround lambda deduction bug on msvc 2013
1561                         .as_dynamic();
1562                 }
1563             );
1564 
1565             THEN("the output contains only error"){
1566                 auto required = rxu::to_vector({
1567                     on.error(200 + 10 * N, ex)
1568                 });
1569                 auto actual = res.get_observer().messages();
1570                 REQUIRE(required == actual);
1571             }
1572 
1573             THEN("there was one subscription and one unsubscription to each observable"){
1574 
__anon9ee5cb453702(rxcpp::test::testable_observable<int> &s)1575                 std::for_each(e.begin(), e.end(), [&](rxcpp::test::testable_observable<int> &s){
1576                     auto required = rxu::to_vector({
1577                         on.subscribe(200, 200 + 10 * N)
1578                     });
1579                     auto actual = s.subscriptions();
1580                     REQUIRE(required == actual);
1581                 });
1582             }
1583         }
1584     }
1585 }
1586 
1587 SCENARIO("with_latest_from typical N", "[with_latest_from][join][operators]"){
1588     GIVEN("N hot observables of ints."){
1589         auto sc = rxsc::make_test();
1590         auto w = sc.create_worker();
1591         const rxsc::test::messages<int> on;
1592 
1593         const int N = 4;
1594 
1595         std::vector<rxcpp::test::testable_observable<int>> o;
1596         for (int i = 0; i < N; ++i) {
1597             o.push_back(
1598                 sc.make_hot_observable({
1599                     on.next(150, 1),
1600                     on.next(210 + 10 * i, i + 1),
1601                     on.next(410 + 10 * i, i + N + 1),
1602                     on.completed(800)
1603                 })
1604             );
1605         }
1606 
1607         WHEN("each int is combined with the latest from the other source"){
1608 
1609             auto res = w.start(
__anon9ee5cb453802() 1610                 [&]() {
1611                     return o[3]
1612                         .with_latest_from(
1613                             [](int v0, int v1, int v2, int v3) {
1614                                 return v0 + v1 + v2 + v3;
1615                             },
1616                             o[0], o[1], o[2]
1617                         )
1618                         // forget type to workaround lambda deduction bug on msvc 2013
1619                         .as_dynamic();
1620                 }
1621             );
1622 
1623             THEN("the output contains combined ints"){
1624                 auto required = rxu::to_vector({
1625                     on.next(200 + 10 * N, N * (N + 1) / 2),
1626                     on.next(410 + 10 * (N - 1), (N - 1) * N / 2 + N + N * N)
1627                 });
1628                 required.push_back(on.completed(800));
1629                 auto actual = res.get_observer().messages();
1630                 REQUIRE(required == actual);
1631             }
1632 
1633             THEN("there was one subscription and one unsubscription to each observable"){
1634 
__anon9ee5cb453a02(rxcpp::test::testable_observable<int> &s)1635                 std::for_each(o.begin(), o.end(), [&](rxcpp::test::testable_observable<int> &s){
1636                     auto required = rxu::to_vector({
1637                         on.subscribe(200, 800)
1638                     });
1639                     auto actual = s.subscriptions();
1640                     REQUIRE(required == actual);
1641                 });
1642             }
1643         }
1644     }
1645 }
1646