1 #include "../test.h"
2 #include <rxcpp/operators/rx-zip.hpp>
3 
4 SCENARIO("zip never/never", "[zip][join][operators]"){
5     GIVEN("2 hot observables of ints."){
6         auto sc = rxsc::make_test();
7         auto w = sc.create_worker();
8         const rxsc::test::messages<int> on;
9 
10         auto n1 = sc.make_hot_observable({
11             on.next(150, 1)
12         });
13 
14         auto n2 = sc.make_hot_observable({
15             on.next(150, 1)
16         });
17 
18         WHEN("each int is combined with the latest from the other source"){
19 
20             auto res = w.start(
__anond350a77d0102() 21                 [&]() {
22                     return n1
23                         | rxo::zip(
24                             [](int v2, int v1){
25                                 return v2 + v1;
26                             },
27                             n2
28                         )
29                         // forget type to workaround lambda deduction bug on msvc 2013
30                         | rxo::as_dynamic();
31                 }
32             );
33 
34             THEN("the output is empty"){
35                 auto required = std::vector<rxsc::test::messages<int>::recorded_type>();
36                 auto actual = res.get_observer().messages();
37                 REQUIRE(required == actual);
38             }
39 
40             THEN("there was one subscription and one unsubscription to the n1"){
41                 auto required = rxu::to_vector({
42                     on.subscribe(200, 1000)
43                 });
44                 auto actual = n1.subscriptions();
45                 REQUIRE(required == actual);
46             }
47 
48             THEN("there was one subscription and one unsubscription to the n2"){
49                 auto required = rxu::to_vector({
50                     on.subscribe(200, 1000)
51                 });
52                 auto actual = n2.subscriptions();
53                 REQUIRE(required == actual);
54             }
55         }
56     }
57 }
58 
59 SCENARIO("zip never N", "[zip][join][operators]"){
60     GIVEN("N never completed hot observables of ints."){
61         auto sc = rxsc::make_test();
62         auto w = sc.create_worker();
63         const rxsc::test::messages<int> on;
64 
65         const std::size_t N = 4;
66 
67         std::vector<rxcpp::test::testable_observable<int>> n;
68         for (std::size_t i = 0; i < N; ++i) {
69             n.push_back(
70                 sc.make_hot_observable({
71                     on.next(150, 1)
72                 })
73             );
74         }
75 
76         WHEN("each int is combined with the latest from the other source"){
77 
78             auto res = w.start(
__anond350a77d0302() 79                 [&]() {
80                     return n[0]
81                         | rxo::zip(
82                             [](int v0, int v1, int v2, int v3){
83                                 return v0 + v1 + v2 + v3;
84                             },
85                             n[1], n[2], n[3]
86                         )
87                         // forget type to workaround lambda deduction bug on msvc 2013
88                         | rxo::as_dynamic();
89                 }
90             );
91 
92             THEN("the output is empty"){
93                 auto required = std::vector<rxsc::test::messages<int>::recorded_type>();
94                 auto actual = res.get_observer().messages();
95                 REQUIRE(required == actual);
96             }
97 
98             THEN("there was one subscription and one unsubscription to each observable"){
99 
__anond350a77d0502(rxcpp::test::testable_observable<int> &s)100                 std::for_each(n.begin(), n.end(), [&](rxcpp::test::testable_observable<int> &s){
101                     auto required = rxu::to_vector({
102                         on.subscribe(200, 1000)
103                     });
104                     auto actual = s.subscriptions();
105                     REQUIRE(required == actual);
106                 });
107             }
108         }
109     }
110 }
111 
112 SCENARIO("zip never/empty", "[zip][join][operators]"){
113     GIVEN("2 hot observables of ints."){
114         auto sc = rxsc::make_test();
115         auto w = sc.create_worker();
116         const rxsc::test::messages<int> on;
117 
118         auto n = sc.make_hot_observable({
119             on.next(150, 1)
120         });
121 
122         auto e = sc.make_hot_observable({
123             on.next(150, 1),
124             on.completed(210)
125         });
126 
127         WHEN("each int is combined with the latest from the other source"){
128 
129             auto res = w.start(
__anond350a77d0602() 130                 [&]() {
131                     return n
132                         .zip(
133                             [](int v2, int v1){
134                                 return v2 + v1;
135                             },
136                             e
137                         )
138                         // forget type to workaround lambda deduction bug on msvc 2013
139                         .as_dynamic();
140                 }
141             );
142 
143             THEN("the output is empty"){
144                 auto required = std::vector<rxsc::test::messages<int>::recorded_type>();
145                 auto actual = res.get_observer().messages();
146                 REQUIRE(required == actual);
147             }
148 
149             THEN("there was one subscription and one unsubscription to the n"){
150                 auto required = rxu::to_vector({
151                     on.subscribe(200, 1000)
152                 });
153                 auto actual = n.subscriptions();
154                 REQUIRE(required == actual);
155             }
156 
157             THEN("there was one subscription and one unsubscription to the e"){
158                 auto required = rxu::to_vector({
159                     on.subscribe(200, 210)
160                 });
161                 auto actual = e.subscriptions();
162                 REQUIRE(required == actual);
163             }
164         }
165     }
166 }
167 
168 SCENARIO("zip empty/never", "[zip][join][operators]"){
169     GIVEN("2 hot observables of ints."){
170         auto sc = rxsc::make_test();
171         auto w = sc.create_worker();
172         const rxsc::test::messages<int> on;
173 
174         auto e = sc.make_hot_observable({
175             on.next(150, 1),
176             on.completed(210)
177         });
178 
179         auto n = sc.make_hot_observable({
180             on.next(150, 1)
181         });
182 
183         WHEN("each int is combined with the latest from the other source"){
184 
185             auto res = w.start(
__anond350a77d0802() 186                 [&]() {
187                     return e
188                         .zip(
189                             [](int v2, int v1){
190                                 return v2 + v1;
191                             },
192                             n
193                         )
194                         // forget type to workaround lambda deduction bug on msvc 2013
195                         .as_dynamic();
196                 }
197             );
198 
199             THEN("the output is empty"){
200                 auto required = std::vector<rxsc::test::messages<int>::recorded_type>();
201                 auto actual = res.get_observer().messages();
202                 REQUIRE(required == actual);
203             }
204 
205             THEN("there was one subscription and one unsubscription to the e"){
206                 auto required = rxu::to_vector({
207                     on.subscribe(200, 210)
208                 });
209                 auto actual = e.subscriptions();
210                 REQUIRE(required == actual);
211             }
212 
213             THEN("there was one subscription and one unsubscription to the n"){
214                 auto required = rxu::to_vector({
215                     on.subscribe(200, 1000)
216                 });
217                 auto actual = n.subscriptions();
218                 REQUIRE(required == actual);
219             }
220         }
221     }
222 }
223 
224 SCENARIO("zip empty/empty", "[zip][join][operators]"){
225     GIVEN("2 hot observables of ints."){
226         auto sc = rxsc::make_test();
227         auto w = sc.create_worker();
228         const rxsc::test::messages<int> on;
229 
230         auto e1 = sc.make_hot_observable({
231             on.next(150, 1),
232             on.completed(210)
233         });
234 
235         auto e2 = sc.make_hot_observable({
236             on.next(150, 1),
237             on.completed(210)
238         });
239 
240         WHEN("each int is combined with the latest from the other source"){
241 
242             auto res = w.start(
__anond350a77d0a02() 243                 [&]() {
244                     return e1
245                         .zip(
246                             [](int v2, int v1){
247                                 return v2 + v1;
248                             },
249                             e2
250                         )
251                         // forget type to workaround lambda deduction bug on msvc 2013
252                         .as_dynamic();
253                 }
254             );
255 
256             THEN("the output contains only complete message"){
257                 auto required = rxu::to_vector({
258                     on.completed(210)
259                 });
260                 auto actual = res.get_observer().messages();
261                 REQUIRE(required == actual);
262             }
263 
264             THEN("there was one subscription and one unsubscription to the e"){
265                 auto required = rxu::to_vector({
266                     on.subscribe(200, 210)
267                 });
268                 auto actual = e1.subscriptions();
269                 REQUIRE(required == actual);
270             }
271 
272             THEN("there was one subscription and one unsubscription to the n"){
273                 auto required = rxu::to_vector({
274                     on.subscribe(200, 210)
275                 });
276                 auto actual = e2.subscriptions();
277                 REQUIRE(required == actual);
278             }
279         }
280     }
281 }
282 
283 SCENARIO("zip empty N", "[zip][join][operators]"){
284     GIVEN("N empty hot observables of ints."){
285         auto sc = rxsc::make_test();
286         auto w = sc.create_worker();
287         const rxsc::test::messages<int> on;
288 
289         const int N = 4;
290 
291         std::vector<rxcpp::test::testable_observable<int>> e;
292         for (int i = 0; i < N; ++i) {
293             e.push_back(
294                 sc.make_hot_observable({
295                     on.next(150, 1),
296                     on.completed(210 + 10 * i)
297                 })
298             );
299         }
300 
301         WHEN("each int is combined with the latest from the other source"){
302 
303             auto res = w.start(
__anond350a77d0c02() 304                 [&]() {
305                     return e[0]
306                         .zip(
307                             [](int v0, int v1, int v2, int v3){
308                                 return v0 + v1 + v2 + v3;
309                             },
310                             e[1], e[2], e[3]
311                         )
312                         // forget type to workaround lambda deduction bug on msvc 2013
313                         .as_dynamic();
314                 }
315             );
316 
317             THEN("the output contains only complete message"){
318                 auto required = rxu::to_vector({
319                     on.completed(200 + 10 * N)
320                 });
321                 auto actual = res.get_observer().messages();
322                 REQUIRE(required == actual);
323             }
324 
325             THEN("there was one subscription and one unsubscription to each observable"){
326 
327                 int i = 0;
__anond350a77d0e02(rxcpp::test::testable_observable<int> &s)328                 std::for_each(e.begin(), e.end(), [&](rxcpp::test::testable_observable<int> &s){
329                     auto required = rxu::to_vector({
330                         on.subscribe(200, 200 + 10 * ++i)
331                     });
332                     auto actual = s.subscriptions();
333                     REQUIRE(required == actual);
334                 });
335             }
336         }
337     }
338 }
339 
340 SCENARIO("zip empty/return", "[zip][join][operators]"){
341     GIVEN("2 hot observables of ints."){
342         auto sc = rxsc::make_test();
343         auto w = sc.create_worker();
344         const rxsc::test::messages<int> on;
345 
346         auto e = sc.make_hot_observable({
347             on.next(150, 1),
348             on.completed(210)
349         });
350 
351         auto o = sc.make_hot_observable({
352             on.next(150, 1),
353             on.next(215, 2),
354             on.completed(220)
355         });
356 
357         WHEN("each int is combined with the latest from the other source"){
358 
359             auto res = w.start(
__anond350a77d0f02() 360                 [&]() {
361                     return e
362                         .zip(
363                             [](int v2, int v1){
364                                 return v2 + v1;
365                             },
366                             o
367                         )
368                         // forget type to workaround lambda deduction bug on msvc 2013
369                         .as_dynamic();
370                 }
371             );
372 
373             THEN("the output contains only complete message"){
374                 auto required = rxu::to_vector({
375                     on.completed(215)
376                 });
377                 auto actual = res.get_observer().messages();
378                 REQUIRE(required == actual);
379             }
380 
381             THEN("there was one subscription and one unsubscription to the e"){
382                 auto required = rxu::to_vector({
383                     on.subscribe(200, 210)
384                 });
385                 auto actual = e.subscriptions();
386                 REQUIRE(required == actual);
387             }
388 
389             THEN("there was one subscription and one unsubscription to the o"){
390                 auto required = rxu::to_vector({
391                     on.subscribe(200, 215)
392                 });
393                 auto actual = o.subscriptions();
394                 REQUIRE(required == actual);
395             }
396         }
397     }
398 }
399 
400 SCENARIO("zip return/empty", "[zip][join][operators]"){
401     GIVEN("2 hot observables of ints."){
402         auto sc = rxsc::make_test();
403         auto w = sc.create_worker();
404         const rxsc::test::messages<int> on;
405 
406         auto o = sc.make_hot_observable({
407             on.next(150, 1),
408             on.next(215, 2),
409             on.completed(220)
410         });
411 
412         auto e = sc.make_hot_observable({
413             on.next(150, 1),
414             on.completed(210)
415         });
416 
417         WHEN("each int is combined with the latest from the other source"){
418 
419             auto res = w.start(
__anond350a77d1102() 420                 [&]() {
421                     return o
422                         .zip(
423                             [](int v2, int v1){
424                                 return v2 + v1;
425                             },
426                             e
427                         )
428                         // forget type to workaround lambda deduction bug on msvc 2013
429                         .as_dynamic();
430                 }
431             );
432 
433             THEN("the output contains only complete message"){
434                 auto required = rxu::to_vector({
435                     on.completed(215)
436                 });
437                 auto actual = res.get_observer().messages();
438                 REQUIRE(required == actual);
439             }
440 
441             THEN("there was one subscription and one unsubscription to the o"){
442                 auto required = rxu::to_vector({
443                     on.subscribe(200, 215)
444                 });
445                 auto actual = o.subscriptions();
446                 REQUIRE(required == actual);
447             }
448 
449             THEN("there was one subscription and one unsubscription to the e"){
450                 auto required = rxu::to_vector({
451                     on.subscribe(200, 210)
452                 });
453                 auto actual = e.subscriptions();
454                 REQUIRE(required == actual);
455             }
456         }
457     }
458 }
459 
460 SCENARIO("zip never/return", "[zip][join][operators]"){
461     GIVEN("2 hot observables of ints."){
462         auto sc = rxsc::make_test();
463         auto w = sc.create_worker();
464         const rxsc::test::messages<int> on;
465 
466         auto n = sc.make_hot_observable({
467             on.next(150, 1)
468         });
469 
470         auto o = sc.make_hot_observable({
471             on.next(150, 1),
472             on.next(215, 2),
473             on.completed(220)
474         });
475 
476         WHEN("each int is combined with the latest from the other source"){
477 
478             auto res = w.start(
__anond350a77d1302() 479                 [&]() {
480                     return n
481                         .zip(
482                             [](int v2, int v1){
483                                 return v2 + v1;
484                             },
485                             o
486                         )
487                         // forget type to workaround lambda deduction bug on msvc 2013
488                         .as_dynamic();
489                 }
490             );
491 
492             THEN("the output is empty"){
493                 auto required = std::vector<rxsc::test::messages<int>::recorded_type>();
494                 auto actual = res.get_observer().messages();
495                 REQUIRE(required == actual);
496             }
497 
498             THEN("there was one subscription and one unsubscription to the n"){
499                 auto required = rxu::to_vector({
500                     on.subscribe(200, 1000)
501                 });
502                 auto actual = n.subscriptions();
503                 REQUIRE(required == actual);
504             }
505 
506             THEN("there was one subscription and one unsubscription to the o"){
507                 auto required = rxu::to_vector({
508                     on.subscribe(200, 220)
509                 });
510                 auto actual = o.subscriptions();
511                 REQUIRE(required == actual);
512             }
513         }
514     }
515 }
516 
517 SCENARIO("zip return/never", "[zip][join][operators]"){
518     GIVEN("2 hot observables of ints."){
519         auto sc = rxsc::make_test();
520         auto w = sc.create_worker();
521         const rxsc::test::messages<int> on;
522 
523         auto o = sc.make_hot_observable({
524             on.next(150, 1),
525             on.next(215, 2),
526             on.completed(220)
527         });
528 
529         auto n = sc.make_hot_observable({
530             on.next(150, 1)
531         });
532 
533         WHEN("each int is combined with the latest from the other source"){
534 
535             auto res = w.start(
__anond350a77d1502() 536                 [&]() {
537                     return o
538                         .zip(
539                             [](int v2, int v1){
540                                 return v2 + v1;
541                             },
542                             n
543                         )
544                         // forget type to workaround lambda deduction bug on msvc 2013
545                         .as_dynamic();
546                 }
547             );
548 
549             THEN("the output is empty"){
550                 auto required = std::vector<rxsc::test::messages<int>::recorded_type>();
551                 auto actual = res.get_observer().messages();
552                 REQUIRE(required == actual);
553             }
554 
555             THEN("there was one subscription and one unsubscription to the n"){
556                 auto required = rxu::to_vector({
557                     on.subscribe(200, 1000)
558                 });
559                 auto actual = n.subscriptions();
560                 REQUIRE(required == actual);
561             }
562 
563             THEN("there was one subscription and one unsubscription to the o"){
564                 auto required = rxu::to_vector({
565                     on.subscribe(200, 220)
566                 });
567                 auto actual = o.subscriptions();
568                 REQUIRE(required == actual);
569             }
570         }
571     }
572 }
573 
574 SCENARIO("zip return/return", "[zip][join][operators]"){
575     GIVEN("2 hot observables of ints."){
576         auto sc = rxsc::make_test();
577         auto w = sc.create_worker();
578         const rxsc::test::messages<int> on;
579 
580         auto o1 = sc.make_hot_observable({
581             on.next(150, 1),
582             on.next(215, 2),
583             on.completed(230)
584         });
585 
586         auto o2 = sc.make_hot_observable({
587             on.next(150, 1),
588             on.next(220, 3),
589             on.completed(240)
590         });
591 
592         WHEN("each int is combined with the latest from the other source"){
593 
594             auto res = w.start(
__anond350a77d1702() 595                 [&]() {
596                     return o1
597                         .zip(
598                             [](int v2, int v1){
599                              return v2 + v1;
600                             },
601                             o2
602                         )
603                         // forget type to workaround lambda deduction bug on msvc 2013
604                         .as_dynamic();
605                 }
606             );
607 
608             THEN("the output contains combined ints"){
609                 auto required = rxu::to_vector({
610                     on.next(220, 2 + 3),
611                     on.completed(240)
612                 });
613                 auto actual = res.get_observer().messages();
614                 REQUIRE(required == actual);
615             }
616 
617             THEN("there was one subscription and one unsubscription to the o1"){
618                 auto required = rxu::to_vector({
619                     on.subscribe(200, 230)
620                 });
621                 auto actual = o1.subscriptions();
622                 REQUIRE(required == actual);
623             }
624 
625             THEN("there was one subscription and one unsubscription to the o2"){
626                 auto required = rxu::to_vector({
627                     on.subscribe(200, 240)
628                 });
629                 auto actual = o2.subscriptions();
630                 REQUIRE(required == actual);
631             }
632         }
633     }
634 }
635 
636 SCENARIO("zip empty/error", "[zip][join][operators]"){
637     GIVEN("2 hot observables of ints."){
638         auto sc = rxsc::make_test();
639         auto w = sc.create_worker();
640         const rxsc::test::messages<int> on;
641 
642         std::runtime_error ex("zip on_error from source");
643 
644         auto emp = sc.make_hot_observable({
645             on.next(150, 1),
646             on.completed(230)
647         });
648 
649         auto err = sc.make_hot_observable({
650             on.next(150, 1),
651             on.error(220, ex)
652         });
653 
654         WHEN("each int is combined with the latest from the other source"){
655 
656             auto res = w.start(
__anond350a77d1902() 657                 [&]() {
658                     return emp
659                         .zip(
660                             [](int v2, int v1){
661                                 return v2 + v1;
662                             },
663                             err
664                         )
665                         // forget type to workaround lambda deduction bug on msvc 2013
666                         .as_dynamic();
667                 }
668             );
669 
670             THEN("the output contains only error message"){
671                 auto required = rxu::to_vector({
672                     on.error(220, ex)
673                 });
674                 auto actual = res.get_observer().messages();
675                 REQUIRE(required == actual);
676             }
677 
678             THEN("there was one subscription and one unsubscription to the emp"){
679                 auto required = rxu::to_vector({
680                     on.subscribe(200, 220)
681                 });
682                 auto actual = emp.subscriptions();
683                 REQUIRE(required == actual);
684             }
685 
686             THEN("there was one subscription and one unsubscription to the err"){
687                 auto required = rxu::to_vector({
688                     on.subscribe(200, 220)
689                 });
690                 auto actual = err.subscriptions();
691                 REQUIRE(required == actual);
692             }
693         }
694     }
695 }
696 
697 SCENARIO("zip error/empty", "[zip][join][operators]"){
698     GIVEN("2 hot observables of ints."){
699         auto sc = rxsc::make_test();
700         auto w = sc.create_worker();
701         const rxsc::test::messages<int> on;
702 
703         std::runtime_error ex("zip on_error from source");
704 
705         auto err = sc.make_hot_observable({
706             on.next(150, 1),
707             on.error(220, ex)
708         });
709 
710         auto emp = sc.make_hot_observable({
711             on.next(150, 1),
712             on.completed(230)
713         });
714 
715         WHEN("each int is combined with the latest from the other source"){
716 
717             auto res = w.start(
__anond350a77d1b02() 718                 [&]() {
719                     return err
720                         .zip(
721                             [](int v2, int v1){
722                                 return v2 + v1;
723                             },
724                             emp
725                         )
726                         // forget type to workaround lambda deduction bug on msvc 2013
727                         .as_dynamic();
728                 }
729             );
730 
731             THEN("the output contains only error message"){
732                 auto required = rxu::to_vector({
733                     on.error(220, ex)
734                 });
735                 auto actual = res.get_observer().messages();
736                 REQUIRE(required == actual);
737             }
738 
739             THEN("there was one subscription and one unsubscription to the emp"){
740                 auto required = rxu::to_vector({
741                     on.subscribe(200, 220)
742                 });
743                 auto actual = emp.subscriptions();
744                 REQUIRE(required == actual);
745             }
746 
747             THEN("there was one subscription and one unsubscription to the err"){
748                 auto required = rxu::to_vector({
749                     on.subscribe(200, 220)
750                 });
751                 auto actual = err.subscriptions();
752                 REQUIRE(required == actual);
753             }
754         }
755     }
756 }
757 
758 SCENARIO("zip never/error", "[zip][join][operators]"){
759     GIVEN("2 hot observables of ints."){
760         auto sc = rxsc::make_test();
761         auto w = sc.create_worker();
762         const rxsc::test::messages<int> on;
763 
764         std::runtime_error ex("zip on_error from source");
765 
766         auto n = sc.make_hot_observable({
767             on.next(150, 1)
768         });
769 
770         auto err = sc.make_hot_observable({
771             on.next(150, 1),
772             on.error(220, ex)
773         });
774 
775         WHEN("each int is combined with the latest from the other source"){
776 
777             auto res = w.start(
__anond350a77d1d02() 778                 [&]() {
779                     return n
780                         .zip(
781                             [](int v2, int v1){
782                                 return v2 + v1;
783                             },
784                             err
785                         )
786                         // forget type to workaround lambda deduction bug on msvc 2013
787                         .as_dynamic();
788                 }
789             );
790 
791             THEN("the output contains only error message"){
792                 auto required = rxu::to_vector({
793                     on.error(220, ex)
794                 });
795                 auto actual = res.get_observer().messages();
796                 REQUIRE(required == actual);
797             }
798 
799             THEN("there was one subscription and one unsubscription to the n"){
800                 auto required = rxu::to_vector({
801                     on.subscribe(200, 220)
802                 });
803                 auto actual = n.subscriptions();
804                 REQUIRE(required == actual);
805             }
806 
807             THEN("there was one subscription and one unsubscription to the err"){
808                 auto required = rxu::to_vector({
809                     on.subscribe(200, 220)
810                 });
811                 auto actual = err.subscriptions();
812                 REQUIRE(required == actual);
813             }
814         }
815     }
816 }
817 
818 SCENARIO("zip error/never", "[zip][join][operators]"){
819     GIVEN("2 hot observables of ints."){
820         auto sc = rxsc::make_test();
821         auto w = sc.create_worker();
822         const rxsc::test::messages<int> on;
823 
824         std::runtime_error ex("zip on_error from source");
825 
826         auto err = sc.make_hot_observable({
827             on.next(150, 1),
828             on.error(220, ex)
829         });
830 
831         auto n = sc.make_hot_observable({
832             on.next(150, 1)
833         });
834 
835         WHEN("each int is combined with the latest from the other source"){
836 
837             auto res = w.start(
__anond350a77d1f02() 838                 [&]() {
839                     return err
840                         .zip(
841                             [](int v2, int v1){
842                                 return v2 + v1;
843                             },
844                             n
845                         )
846                         // forget type to workaround lambda deduction bug on msvc 2013
847                         .as_dynamic();
848                 }
849             );
850 
851             THEN("the output contains only error message"){
852                 auto required = rxu::to_vector({
853                     on.error(220, ex)
854                 });
855                 auto actual = res.get_observer().messages();
856                 REQUIRE(required == actual);
857             }
858 
859             THEN("there was one subscription and one unsubscription to the n"){
860                 auto required = rxu::to_vector({
861                     on.subscribe(200, 220)
862                 });
863                 auto actual = n.subscriptions();
864                 REQUIRE(required == actual);
865             }
866 
867             THEN("there was one subscription and one unsubscription to the err"){
868                 auto required = rxu::to_vector({
869                     on.subscribe(200, 220)
870                 });
871                 auto actual = err.subscriptions();
872                 REQUIRE(required == actual);
873             }
874         }
875     }
876 }
877 
878 SCENARIO("zip error/error", "[zip][join][operators]"){
879     GIVEN("2 hot observables of ints."){
880         auto sc = rxsc::make_test();
881         auto w = sc.create_worker();
882         const rxsc::test::messages<int> on;
883 
884         std::runtime_error ex1("zip on_error from source 1");
885         std::runtime_error ex2("zip on_error from source 2");
886 
887         auto err1 = sc.make_hot_observable({
888             on.next(150, 1),
889             on.error(220, ex1)
890         });
891 
892         auto err2 = sc.make_hot_observable({
893             on.next(150, 1),
894             on.error(230, ex2)
895         });
896 
897         WHEN("each int is combined with the latest from the other source"){
898 
899             auto res = w.start(
__anond350a77d2102() 900                 [&]() {
901                     return err1
902                         .zip(
903                             [](int v2, int v1){
904                                 return v2 + v1;
905                             },
906                             err2
907                         )
908                         // forget type to workaround lambda deduction bug on msvc 2013
909                         .as_dynamic();
910                 }
911             );
912 
913             THEN("the output contains only error message"){
914                 auto required = rxu::to_vector({
915                     on.error(220, ex1)
916                 });
917                 auto actual = res.get_observer().messages();
918                 REQUIRE(required == actual);
919             }
920 
921             THEN("there was one subscription and one unsubscription to the err1"){
922                 auto required = rxu::to_vector({
923                     on.subscribe(200, 220)
924                 });
925                 auto actual = err1.subscriptions();
926                 REQUIRE(required == actual);
927             }
928 
929             THEN("there was one subscription and one unsubscription to the err2"){
930                 auto required = rxu::to_vector({
931                     on.subscribe(200, 220)
932                 });
933                 auto actual = err2.subscriptions();
934                 REQUIRE(required == actual);
935             }
936         }
937     }
938 }
939 
940 SCENARIO("zip return/error", "[zip][join][operators]"){
941     GIVEN("2 hot observables of ints."){
942         auto sc = rxsc::make_test();
943         auto w = sc.create_worker();
944         const rxsc::test::messages<int> on;
945 
946         std::runtime_error ex("zip on_error from source");
947 
948         auto o = sc.make_hot_observable({
949             on.next(150, 1),
950             on.next(210, 2),
951             on.completed(230)
952         });
953 
954         auto err = sc.make_hot_observable({
955             on.next(150, 1),
956             on.error(220, ex)
957         });
958 
959         WHEN("each int is combined with the latest from the other source"){
960 
961             auto res = w.start(
__anond350a77d2302() 962                 [&]() {
963                     return o
964                         .zip(
965                             [](int v2, int v1){
966                                 return v2 + v1;
967                             },
968                             err
969                         )
970                         // forget type to workaround lambda deduction bug on msvc 2013
971                         .as_dynamic();
972                 }
973             );
974 
975             THEN("the output contains only error message"){
976                 auto required = rxu::to_vector({
977                     on.error(220, ex)
978                 });
979                 auto actual = res.get_observer().messages();
980                 REQUIRE(required == actual);
981             }
982 
983             THEN("there was one subscription and one unsubscription to the ret"){
984                 auto required = rxu::to_vector({
985                     on.subscribe(200, 220)
986                 });
987                 auto actual = o.subscriptions();
988                 REQUIRE(required == actual);
989             }
990 
991             THEN("there was one subscription and one unsubscription to the err"){
992                 auto required = rxu::to_vector({
993                     on.subscribe(200, 220)
994                 });
995                 auto actual = err.subscriptions();
996                 REQUIRE(required == actual);
997             }
998         }
999     }
1000 }
1001 
1002 SCENARIO("zip error/return", "[zip][join][operators]"){
1003     GIVEN("2 hot observables of ints."){
1004         auto sc = rxsc::make_test();
1005         auto w = sc.create_worker();
1006         const rxsc::test::messages<int> on;
1007 
1008         std::runtime_error ex("zip on_error from source");
1009 
1010         auto err = sc.make_hot_observable({
1011             on.next(150, 1),
1012             on.error(220, ex)
1013         });
1014 
1015         auto ret = sc.make_hot_observable({
1016             on.next(150, 1),
1017             on.next(210, 2),
1018             on.completed(230)
1019         });
1020 
1021         WHEN("each int is combined with the latest from the other source"){
1022 
1023             auto res = w.start(
__anond350a77d2502() 1024                 [&]() {
1025                     return err
1026                         .zip(
1027                             [](int v2, int v1){
1028                                 return v2 + v1;
1029                             },
1030                             ret
1031                         )
1032                         // forget type to workaround lambda deduction bug on msvc 2013
1033                         .as_dynamic();
1034                 }
1035             );
1036 
1037             THEN("the output contains only error message"){
1038                 auto required = rxu::to_vector({
1039                     on.error(220, ex)
1040                 });
1041                 auto actual = res.get_observer().messages();
1042                 REQUIRE(required == actual);
1043             }
1044 
1045             THEN("there was one subscription and one unsubscription to the ret"){
1046                 auto required = rxu::to_vector({
1047                     on.subscribe(200, 220)
1048                 });
1049                 auto actual = ret.subscriptions();
1050                 REQUIRE(required == actual);
1051             }
1052 
1053             THEN("there was one subscription and one unsubscription to the err"){
1054                 auto required = rxu::to_vector({
1055                     on.subscribe(200, 220)
1056                 });
1057                 auto actual = err.subscriptions();
1058                 REQUIRE(required == actual);
1059             }
1060         }
1061     }
1062 }
1063 
1064 SCENARIO("zip left completes first", "[zip][join][operators]"){
1065     GIVEN("2 hot observables of ints."){
1066         auto sc = rxsc::make_test();
1067         auto w = sc.create_worker();
1068         const rxsc::test::messages<int> on;
1069 
1070         auto o1 = sc.make_hot_observable({
1071             on.next(150, 1),
1072             on.next(210, 2),
1073             on.completed(220)
1074         });
1075 
1076         auto o2 = sc.make_hot_observable({
1077             on.next(150, 1),
1078             on.next(215, 4),
1079             on.completed(225)
1080         });
1081 
1082         WHEN("each int is combined with the latest from the other source"){
1083 
1084             auto res = w.start(
__anond350a77d2702() 1085                 [&]() {
1086                     return o2
1087                         .zip(
1088                             [](int v2, int v1){
1089                                 return v2 + v1;
1090                             },
1091                             o1
1092                         )
1093                         // forget type to workaround lambda deduction bug on msvc 2013
1094                         .as_dynamic();
1095                 }
1096             );
1097 
1098             THEN("the output contains combined ints"){
1099                 auto required = rxu::to_vector({
1100                     on.next(215, 2 + 4),
1101                     on.completed(225)
1102                 });
1103                 auto actual = res.get_observer().messages();
1104                 REQUIRE(required == actual);
1105             }
1106 
1107             THEN("there was one subscription and one unsubscription to the o1"){
1108                 auto required = rxu::to_vector({
1109                     on.subscribe(200, 220)
1110                 });
1111                 auto actual = o1.subscriptions();
1112                 REQUIRE(required == actual);
1113             }
1114 
1115             THEN("there was one subscription and one unsubscription to the o2"){
1116                 auto required = rxu::to_vector({
1117                     on.subscribe(200, 225)
1118                 });
1119                 auto actual = o2.subscriptions();
1120                 REQUIRE(required == actual);
1121             }
1122         }
1123     }
1124 }
1125 
1126 SCENARIO("zip right completes first", "[zip][join][operators]"){
1127     GIVEN("2 hot observables of ints."){
1128         auto sc = rxsc::make_test();
1129         auto w = sc.create_worker();
1130         const rxsc::test::messages<int> on;
1131 
1132         auto o1 = sc.make_hot_observable({
1133             on.next(150, 1),
1134             on.next(215, 4),
1135             on.completed(225)
1136         });
1137 
1138         auto o2 = sc.make_hot_observable({
1139             on.next(150, 1),
1140             on.next(210, 2),
1141             on.completed(220)
1142         });
1143 
1144         WHEN("each int is combined with the latest from the other source"){
1145 
1146             auto res = w.start(
__anond350a77d2902() 1147                 [&]() {
1148                     return o2
1149                         .zip(
1150                             [](int v2, int v1){
1151                                 return v2 + v1;
1152                             },
1153                             o1
1154                         )
1155                         // forget type to workaround lambda deduction bug on msvc 2013
1156                         .as_dynamic();
1157                 }
1158             );
1159 
1160             THEN("the output contains combined ints"){
1161                 auto required = rxu::to_vector({
1162                     on.next(215, 2 + 4),
1163                     on.completed(225)
1164                 });
1165                 auto actual = res.get_observer().messages();
1166                 REQUIRE(required == actual);
1167             }
1168 
1169             THEN("there was one subscription and one unsubscription to the o1"){
1170                 auto required = rxu::to_vector({
1171                     on.subscribe(200, 225)
1172                 });
1173                 auto actual = o1.subscriptions();
1174                 REQUIRE(required == actual);
1175             }
1176 
1177             THEN("there was one subscription and one unsubscription to the o2"){
1178                 auto required = rxu::to_vector({
1179                     on.subscribe(200, 220)
1180                 });
1181                 auto actual = o2.subscriptions();
1182                 REQUIRE(required == actual);
1183             }
1184         }
1185     }
1186 }
1187 
1188 SCENARIO("zip selector throws", "[zip][join][operators][!throws]"){
1189     GIVEN("2 hot observables of ints."){
1190         auto sc = rxsc::make_test();
1191         auto w = sc.create_worker();
1192         const rxsc::test::messages<int> on;
1193 
1194         std::runtime_error ex("zip on_error from source");
1195 
1196         auto o1 = sc.make_hot_observable({
1197             on.next(150, 1),
1198             on.next(215, 2),
1199             on.completed(230)
1200         });
1201 
1202         auto o2 = sc.make_hot_observable({
1203             on.next(150, 1),
1204             on.next(220, 3),
1205             on.completed(240)
1206         });
1207 
1208         WHEN("each int is combined with the latest from the other source"){
1209 
1210             auto res = w.start(
__anond350a77d2b02() 1211                 [&]() {
1212                     return o1
1213                         .zip(
1214                             [&ex](int, int) -> int {
1215                                 rxu::throw_exception(ex);
1216                             },
1217                             o2
1218                         )
1219                         // forget type to workaround lambda deduction bug on msvc 2013
1220                         .as_dynamic();
1221                 }
1222             );
1223 
1224             THEN("the output contains only error"){
1225                 auto required = rxu::to_vector({
1226                     on.error(220, ex)
1227                 });
1228                 auto actual = res.get_observer().messages();
1229                 REQUIRE(required == actual);
1230             }
1231 
1232             THEN("there was one subscription and one unsubscription to the o1"){
1233                 auto required = rxu::to_vector({
1234                     on.subscribe(200, 220)
1235                 });
1236                 auto actual = o1.subscriptions();
1237                 REQUIRE(required == actual);
1238             }
1239 
1240             THEN("there was one subscription and one unsubscription to the o2"){
1241                 auto required = rxu::to_vector({
1242                     on.subscribe(200, 220)
1243                 });
1244                 auto actual = o2.subscriptions();
1245                 REQUIRE(required == actual);
1246             }
1247         }
1248     }
1249 }
1250 
1251 SCENARIO("zip selector throws N", "[zip][join][operators][!throws]"){
1252     GIVEN("N hot observables of ints."){
1253         auto sc = rxsc::make_test();
1254         auto w = sc.create_worker();
1255         const rxsc::test::messages<int> on;
1256 
1257         const int N = 4;
1258 
1259         std::runtime_error ex("zip on_error from source");
1260 
1261         std::vector<rxcpp::test::testable_observable<int>> e;
1262         for (int i = 0; i < N; ++i) {
1263             e.push_back(
1264                 sc.make_hot_observable({
1265                     on.next(210 + 10 * i, 1),
1266                     on.completed(500)
1267                 })
1268             );
1269         }
1270 
1271         WHEN("each int is combined with the latest from the other source"){
1272 
1273             auto res = w.start(
__anond350a77d2d02() 1274                 [&]() {
1275                     return e[0]
1276                         .zip(
1277                             [&ex](int, int, int, int) -> int {
1278                                 rxu::throw_exception(ex);
1279                             },
1280                             e[1], e[2], e[3]
1281                         )
1282                         // forget type to workaround lambda deduction bug on msvc 2013
1283                         .as_dynamic();
1284                 }
1285             );
1286 
1287             THEN("the output contains only error"){
1288                 auto required = rxu::to_vector({
1289                     on.error(200 + 10 * N, ex)
1290                 });
1291                 auto actual = res.get_observer().messages();
1292                 REQUIRE(required == actual);
1293             }
1294 
1295             THEN("there was one subscription and one unsubscription to each observable"){
1296 
__anond350a77d2f02(rxcpp::test::testable_observable<int> &s)1297                 std::for_each(e.begin(), e.end(), [&](rxcpp::test::testable_observable<int> &s){
1298                     auto required = rxu::to_vector({
1299                         on.subscribe(200, 200 + 10 * N)
1300                     });
1301                     auto actual = s.subscriptions();
1302                     REQUIRE(required == actual);
1303                 });
1304             }
1305         }
1306     }
1307 }
1308 
1309 SCENARIO("zip typical N", "[zip][join][operators]"){
1310     GIVEN("N hot observables of ints."){
1311         auto sc = rxsc::make_test();
1312         auto w = sc.create_worker();
1313         const rxsc::test::messages<int> on;
1314 
1315         const int N = 4;
1316 
1317         std::vector<rxcpp::test::testable_observable<int>> o;
1318         for (int i = 0; i < N; ++i) {
1319             o.push_back(
1320                 sc.make_hot_observable({
1321                     on.next(150, 1),
1322                     on.next(210 + 10 * i, i + 1),
1323                     on.next(410 + 10 * i, i + N + 1),
1324                     on.completed(800)
1325                 })
1326             );
1327         }
1328 
1329         WHEN("each int is combined with the latest from the other source"){
1330 
1331             auto res = w.start(
__anond350a77d3002() 1332                 [&]() {
1333                     return o[0]
1334                         .zip(
1335                             [](int v0, int v1, int v2, int v3) {
1336                                 return v0 + v1 + v2 + v3;
1337                             },
1338                             o[1], o[2], o[3]
1339                         )
1340                         // forget type to workaround lambda deduction bug on msvc 2013
1341                         .as_dynamic();
1342                 }
1343             );
1344 
1345             THEN("the output contains combined ints"){
1346                 auto required = rxu::to_vector({
1347                     on.next(200 + 10 * N, N * (N + 1) / 2),
1348                     on.next(400 + 10 * N, N * (3 * N + 1) / 2)
1349                 });
1350                 required.push_back(on.completed(800));
1351                 auto actual = res.get_observer().messages();
1352                 REQUIRE(required == actual);
1353             }
1354 
1355             THEN("there was one subscription and one unsubscription to each observable"){
1356 
__anond350a77d3202(rxcpp::test::testable_observable<int> &s)1357                 std::for_each(o.begin(), o.end(), [&](rxcpp::test::testable_observable<int> &s){
1358                     auto required = rxu::to_vector({
1359                         on.subscribe(200, 800)
1360                     });
1361                     auto actual = s.subscriptions();
1362                     REQUIRE(required == actual);
1363                 });
1364             }
1365         }
1366     }
1367 }
1368 
1369 SCENARIO("zip interleaved with tail", "[zip][join][operators]"){
1370     GIVEN("2 hot observables of ints."){
1371         auto sc = rxsc::make_test();
1372         auto w = sc.create_worker();
1373         const rxsc::test::messages<int> on;
1374 
1375         auto o1 = sc.make_hot_observable({
1376             on.next(150, 1),
1377             on.next(215, 2),
1378             on.next(225, 4),
1379             on.completed(230)
1380         });
1381 
1382         auto o2 = sc.make_hot_observable({
1383             on.next(150, 1),
1384             on.next(220, 3),
1385             on.next(230, 5),
1386             on.next(235, 6),
1387             on.next(240, 7),
1388             on.completed(250)
1389         });
1390 
1391         WHEN("each int is combined with the latest from the other source"){
1392 
1393             auto res = w.start(
__anond350a77d3302() 1394                 [&]() {
1395                     return o2
1396                         .zip(
1397                             [](int v2, int v1){
1398                                 return v2 + v1;
1399                             },
1400                             o1
1401                         )
1402                         // forget type to workaround lambda deduction bug on msvc 2013
1403                         .as_dynamic();
1404                 }
1405             );
1406 
1407             THEN("the output contains combined ints"){
1408                 auto required = rxu::to_vector({
1409                     on.next(220, 2 + 3),
1410                     on.next(230, 4 + 5),
1411                     on.completed(230)
1412                 });
1413                 auto actual = res.get_observer().messages();
1414                 REQUIRE(required == actual);
1415             }
1416 
1417             THEN("there was one subscription and one unsubscription to the o1"){
1418                 auto required = rxu::to_vector({
1419                     on.subscribe(200, 230)
1420                 });
1421                 auto actual = o1.subscriptions();
1422                 REQUIRE(required == actual);
1423             }
1424 
1425             THEN("there was one subscription and one unsubscription to the o2"){
1426                 auto required = rxu::to_vector({
1427                     on.subscribe(200, 230)
1428                 });
1429                 auto actual = o2.subscriptions();
1430                 REQUIRE(required == actual);
1431             }
1432         }
1433     }
1434 }
1435 
1436 SCENARIO("zip consecutive", "[zip][join][operators]"){
1437     GIVEN("2 hot observables of ints."){
1438         auto sc = rxsc::make_test();
1439         auto w = sc.create_worker();
1440         const rxsc::test::messages<int> on;
1441 
1442         auto o1 = sc.make_hot_observable({
1443             on.next(150, 1),
1444             on.next(215, 2),
1445             on.next(225, 4),
1446             on.completed(230)
1447         });
1448 
1449         auto o2 = sc.make_hot_observable({
1450             on.next(150, 1),
1451             on.next(235, 6),
1452             on.next(240, 7),
1453             on.completed(250)
1454         });
1455 
1456         WHEN("each int is combined with the latest from the other source"){
1457 
1458             auto res = w.start(
__anond350a77d3502() 1459                 [&]() {
1460                     return o2
1461                         .zip(
1462                             [](int v2, int v1){
1463                                 return v2 + v1;
1464                             },
1465                             o1
1466                         )
1467                         // forget type to workaround lambda deduction bug on msvc 2013
1468                         .as_dynamic();
1469                 }
1470             );
1471 
1472             THEN("the output contains combined ints"){
1473                 auto required = rxu::to_vector({
1474                     on.next(235, 2 + 6),
1475                     on.next(240, 4 + 7),
1476                     on.completed(240)
1477                 });
1478                 auto actual = res.get_observer().messages();
1479                 REQUIRE(required == actual);
1480             }
1481 
1482             THEN("there was one subscription and one unsubscription to the o1"){
1483                 auto required = rxu::to_vector({
1484                     on.subscribe(200, 230)
1485                 });
1486                 auto actual = o1.subscriptions();
1487                 REQUIRE(required == actual);
1488             }
1489 
1490             THEN("there was one subscription and one unsubscription to the o2"){
1491                 auto required = rxu::to_vector({
1492                     on.subscribe(200, 240)
1493                 });
1494                 auto actual = o2.subscriptions();
1495                 REQUIRE(required == actual);
1496             }
1497         }
1498     }
1499 }
1500 
1501 SCENARIO("zip consecutive ends with error left", "[zip][join][operators]"){
1502     GIVEN("2 hot observables of ints."){
1503         auto sc = rxsc::make_test();
1504         auto w = sc.create_worker();
1505         const rxsc::test::messages<int> on;
1506 
1507         std::runtime_error ex("zip on_error from source");
1508 
1509         auto o1 = sc.make_hot_observable({
1510             on.next(150, 1),
1511             on.next(215, 2),
1512             on.next(225, 4),
1513             on.error(230, ex)
1514         });
1515 
1516         auto o2 = sc.make_hot_observable({
1517             on.next(150, 1),
1518             on.next(235, 6),
1519             on.next(240, 7),
1520             on.completed(250)
1521         });
1522 
1523         WHEN("each int is combined with the latest from the other source"){
1524 
1525             auto res = w.start(
__anond350a77d3702() 1526                 [&]() {
1527                     return o2
1528                         .zip(
1529                             [](int v2, int v1){
1530                                 return v2 + v1;
1531                             },
1532                             o1
1533                         )
1534                         // forget type to workaround lambda deduction bug on msvc 2013
1535                         .as_dynamic();
1536                 }
1537             );
1538 
1539             THEN("the output contains only an error"){
1540                 auto required = rxu::to_vector({
1541                     on.error(230, ex)
1542                 });
1543                 auto actual = res.get_observer().messages();
1544                 REQUIRE(required == actual);
1545             }
1546 
1547             THEN("there was one subscription and one unsubscription to the o1"){
1548                 auto required = rxu::to_vector({
1549                     on.subscribe(200, 230)
1550                 });
1551                 auto actual = o1.subscriptions();
1552                 REQUIRE(required == actual);
1553             }
1554 
1555             THEN("there was one subscription and one unsubscription to the o2"){
1556                 auto required = rxu::to_vector({
1557                     on.subscribe(200, 230)
1558                 });
1559                 auto actual = o2.subscriptions();
1560                 REQUIRE(required == actual);
1561             }
1562         }
1563     }
1564 }
1565 
1566 SCENARIO("zip consecutive ends with error right", "[zip][join][operators]"){
1567     GIVEN("2 hot observables of ints."){
1568         auto sc = rxsc::make_test();
1569         auto w = sc.create_worker();
1570         const rxsc::test::messages<int> on;
1571 
1572         std::runtime_error ex("zip on_error from source");
1573 
1574         auto o1 = sc.make_hot_observable({
1575             on.next(150, 1),
1576             on.next(215, 2),
1577             on.next(225, 4),
1578             on.completed(250)
1579         });
1580 
1581         auto o2 = sc.make_hot_observable({
1582             on.next(150, 1),
1583             on.next(235, 6),
1584             on.next(240, 7),
1585             on.error(245, ex)
1586         });
1587 
1588         WHEN("each int is combined with the latest from the other source"){
1589 
1590             auto res = w.start(
__anond350a77d3902() 1591                 [&]() {
1592                     return o2
1593                         .zip(
1594                             [](int v2, int v1){
1595                                 return v2 + v1;
1596                             },
1597                             o1
1598                         )
1599                         // forget type to workaround lambda deduction bug on msvc 2013
1600                         .as_dynamic();
1601                 }
1602             );
1603 
1604             THEN("the output contains combined ints followed by an error"){
1605                 auto required = rxu::to_vector({
1606                     on.next(235, 2 + 6),
1607                     on.next(240, 4 + 7),
1608                     on.error(245, ex)
1609                 });
1610                 auto actual = res.get_observer().messages();
1611                 REQUIRE(required == actual);
1612             }
1613 
1614             THEN("there was one subscription and one unsubscription to the o1"){
1615                 auto required = rxu::to_vector({
1616                     on.subscribe(200, 245)
1617                 });
1618                 auto actual = o1.subscriptions();
1619                 REQUIRE(required == actual);
1620             }
1621 
1622             THEN("there was one subscription and one unsubscription to the o2"){
1623                 auto required = rxu::to_vector({
1624                     on.subscribe(200, 245)
1625                 });
1626                 auto actual = o2.subscriptions();
1627                 REQUIRE(required == actual);
1628             }
1629         }
1630     }
1631 }
1632 
1633 SCENARIO("zip next+error/error", "[zip][join][operators]"){
1634     GIVEN("2 hot observables of ints."){
1635         auto sc = rxsc::make_test();
1636         auto w = sc.create_worker();
1637         const rxsc::test::messages<int> on;
1638 
1639         std::runtime_error ex1("zip on_error from source 1");
1640         std::runtime_error ex2("zip on_error from source 2");
1641 
1642         auto err1 = sc.make_hot_observable({
1643             on.next(150, 1),
1644             on.next(210, 2),
1645             on.error(220, ex1)
1646         });
1647 
1648         auto err2 = sc.make_hot_observable({
1649             on.next(150, 1),
1650             on.error(230, ex2)
1651         });
1652 
1653         WHEN("each int is combined with the latest from the other source"){
1654 
1655             auto res = w.start(
__anond350a77d3b02() 1656                 [&]() {
1657                     return err1
1658                         .zip(
1659                             [](int v2, int v1){
1660                                 return v2 + v1;
1661                             },
1662                             err2
1663                         )
1664                         // forget type to workaround lambda deduction bug on msvc 2013
1665                         .as_dynamic();
1666                 }
1667             );
1668 
1669             THEN("the output contains only error message"){
1670                 auto required = rxu::to_vector({
1671                     on.error(220, ex1)
1672                 });
1673                 auto actual = res.get_observer().messages();
1674                 REQUIRE(required == actual);
1675             }
1676 
1677             THEN("there was one subscription and one unsubscription to the err1"){
1678                 auto required = rxu::to_vector({
1679                     on.subscribe(200, 220)
1680                 });
1681                 auto actual = err1.subscriptions();
1682                 REQUIRE(required == actual);
1683             }
1684 
1685             THEN("there was one subscription and one unsubscription to the err2"){
1686                 auto required = rxu::to_vector({
1687                     on.subscribe(200, 220)
1688                 });
1689                 auto actual = err2.subscriptions();
1690                 REQUIRE(required == actual);
1691             }
1692         }
1693     }
1694 }
1695 
1696 SCENARIO("zip error/next+error", "[zip][join][operators]"){
1697     GIVEN("2 hot observables of ints."){
1698         auto sc = rxsc::make_test();
1699         auto w = sc.create_worker();
1700         const rxsc::test::messages<int> on;
1701 
1702         std::runtime_error ex1("zip on_error from source 1");
1703         std::runtime_error ex2("zip on_error from source 2");
1704 
1705         auto err1 = sc.make_hot_observable({
1706             on.next(150, 1),
1707             on.error(230, ex1)
1708         });
1709 
1710         auto err2 = sc.make_hot_observable({
1711             on.next(150, 1),
1712             on.next(210, 2),
1713             on.error(220, ex2)
1714         });
1715 
1716         WHEN("each int is combined with the latest from the other source"){
1717 
1718             auto res = w.start(
__anond350a77d3d02() 1719                 [&]() {
1720                     return err1
1721                         .zip(
1722                             [](int v2, int v1){
1723                                 return v2 + v1;
1724                             },
1725                             err2
1726                         )
1727                         // forget type to workaround lambda deduction bug on msvc 2013
1728                         .as_dynamic();
1729                 }
1730             );
1731 
1732             THEN("the output contains only error message"){
1733                 auto required = rxu::to_vector({
1734                     on.error(220, ex2)
1735                 });
1736                 auto actual = res.get_observer().messages();
1737                 REQUIRE(required == actual);
1738             }
1739 
1740             THEN("there was one subscription and one unsubscription to the err1"){
1741                 auto required = rxu::to_vector({
1742                     on.subscribe(200, 220)
1743                 });
1744                 auto actual = err1.subscriptions();
1745                 REQUIRE(required == actual);
1746             }
1747 
1748             THEN("there was one subscription and one unsubscription to the err2"){
1749                 auto required = rxu::to_vector({
1750                     on.subscribe(200, 220)
1751                 });
1752                 auto actual = err2.subscriptions();
1753                 REQUIRE(required == actual);
1754             }
1755         }
1756     }
1757 }
1758 
1759 SCENARIO("zip error after completed left", "[zip][join][operators]"){
1760     GIVEN("2 hot observables of ints."){
1761         auto sc = rxsc::make_test();
1762         auto w = sc.create_worker();
1763         const rxsc::test::messages<int> on;
1764 
1765         std::runtime_error ex("zip on_error from source");
1766 
1767         auto ret = sc.make_hot_observable({
1768             on.next(150, 1),
1769             on.next(210, 2),
1770             on.completed(215)
1771         });
1772 
1773         auto err = sc.make_hot_observable({
1774             on.next(150, 1),
1775             on.error(220, ex)
1776         });
1777 
1778         WHEN("each int is combined with the latest from the other source"){
1779 
1780             auto res = w.start(
__anond350a77d3f02() 1781                 [&]() {
1782                     return ret
1783                         .zip(
1784                             [](int v2, int v1){
1785                                 return v2 + v1;
1786                             },
1787                             err
1788                         )
1789                         // forget type to workaround lambda deduction bug on msvc 2013
1790                         .as_dynamic();
1791                 }
1792             );
1793 
1794             THEN("the output contains only error message"){
1795                 auto required = rxu::to_vector({
1796                     on.error(220, ex)
1797                 });
1798                 auto actual = res.get_observer().messages();
1799                 REQUIRE(required == actual);
1800             }
1801 
1802             THEN("there was one subscription and one unsubscription to the ret"){
1803                 auto required = rxu::to_vector({
1804                     on.subscribe(200, 215)
1805                 });
1806                 auto actual = ret.subscriptions();
1807                 REQUIRE(required == actual);
1808             }
1809 
1810             THEN("there was one subscription and one unsubscription to the err"){
1811                 auto required = rxu::to_vector({
1812                     on.subscribe(200, 220)
1813                 });
1814                 auto actual = err.subscriptions();
1815                 REQUIRE(required == actual);
1816             }
1817         }
1818     }
1819 }
1820 
1821 SCENARIO("zip error after completed right", "[zip][join][operators]"){
1822     GIVEN("2 hot observables of ints."){
1823         auto sc = rxsc::make_test();
1824         auto w = sc.create_worker();
1825         const rxsc::test::messages<int> on;
1826 
1827         std::runtime_error ex("zip on_error from source");
1828 
1829         auto err = sc.make_hot_observable({
1830             on.next(150, 1),
1831             on.error(220, ex)
1832         });
1833 
1834         auto ret = sc.make_hot_observable({
1835             on.next(150, 1),
1836             on.next(210, 2),
1837             on.completed(215)
1838         });
1839 
1840         WHEN("each int is combined with the latest from the other source"){
1841 
1842             auto res = w.start(
__anond350a77d4102() 1843                 [&]() {
1844                     return err
1845                         .zip(
1846                             [](int v2, int v1){
1847                                 return v2 + v1;
1848                             },
1849                             ret
1850                         )
1851                         // forget type to workaround lambda deduction bug on msvc 2013
1852                         .as_dynamic();
1853                 }
1854             );
1855 
1856             THEN("the output contains only error message"){
1857                 auto required = rxu::to_vector({
1858                     on.error(220, ex)
1859                 });
1860                 auto actual = res.get_observer().messages();
1861                 REQUIRE(required == actual);
1862             }
1863 
1864             THEN("there was one subscription and one unsubscription to the ret"){
1865                 auto required = rxu::to_vector({
1866                     on.subscribe(200, 215)
1867                 });
1868                 auto actual = ret.subscriptions();
1869                 REQUIRE(required == actual);
1870             }
1871 
1872             THEN("there was one subscription and one unsubscription to the err"){
1873                 auto required = rxu::to_vector({
1874                     on.subscribe(200, 220)
1875                 });
1876                 auto actual = err.subscriptions();
1877                 REQUIRE(required == actual);
1878             }
1879         }
1880     }
1881 }
1882