1 #include "../test.h"
2 #include "rxcpp/operators/rx-sample_time.hpp"
3 
4 SCENARIO("sample with time, error", "[sample_with_time][operators]"){
5     GIVEN("1 hot observable of ints."){
6         auto sc = rxsc::make_test();
7         auto so = rx::synchronize_in_one_worker(sc);
8         auto w = sc.create_worker();
9         const rxsc::test::messages<int> on;
10 
11         std::runtime_error ex("sample_with_time on_error from source");
12 
13         auto xs = sc.make_hot_observable({
14             on.next(100, 1),
15             on.next(210, 2),
16             on.next(240, 3),
17             on.next(280, 4),
18             on.next(320, 5),
19             on.next(350, 6),
20             on.next(380, 7),
21             on.next(420, 8),
22             on.next(470, 9),
23             on.error(600, ex)
24         });
25         WHEN("group ints on intersecting intervals"){
26             using namespace std::chrono;
27 
28             auto res = w.start(
__anon94cd60ac0102() 29                 [&]() {
30                     return xs
31                         | rxo::sample_with_time(milliseconds(100), so)
32                         | rxo::as_dynamic();
33                 }
34             );
35 
36             THEN("the output contains groups of ints"){
37                 auto required = rxu::to_vector({
38                     on.next(301, 4),
39                     on.next(401, 7),
40                     on.next(501, 9),
41                     on.error(601, ex)
42                 });
43                 auto actual = res.get_observer().messages();
44                 REQUIRE(required == actual);
45             }
46 
47             THEN("there was one subscription and one unsubscription to the xs"){
48                 auto required = rxu::to_vector({
49                     on.subscribe(200, 600)
50                 });
51                 auto actual = xs.subscriptions();
52                 REQUIRE(required == actual);
53             }
54         }
55     }
56 }
57 
58 SCENARIO("sample with time, disposed", "[sample_with_time][operators]"){
59     GIVEN("1 hot observable of ints."){
60         auto sc = rxsc::make_test();
61         auto so = rx::synchronize_in_one_worker(sc);
62         auto w = sc.create_worker();
63         const rxsc::test::messages<int> on;
64 
65         auto xs = sc.make_hot_observable({
66             on.next(100, 1),
67             on.next(210, 2),
68             on.next(240, 3),
69             on.next(280, 4), //
70             on.next(320, 5),
71             on.next(350, 6),
72             on.next(380, 7),
73             on.next(420, 8),
74             on.next(470, 9),
75             on.completed(600)
76         });
77         WHEN("group ints on intersecting intervals"){
78             using namespace std::chrono;
79 
80             auto res = w.start(
__anon94cd60ac0202() 81                 [&]() {
82                     return xs
83                         .sample_with_time(milliseconds(100), so)
84                         .as_dynamic();
85                 },
86                 370
87             );
88 
89             THEN("the output contains groups of ints"){
90                 auto required = rxu::to_vector({
91                     on.next(301, 4),
92                 });
93                 auto actual = res.get_observer().messages();
94                 REQUIRE(required == actual);
95             }
96 
97             THEN("there was one subscription and one unsubscription to the xs"){
98                 auto required = rxu::to_vector({
99                     on.subscribe(200, 371)
100                 });
101                 auto actual = xs.subscriptions();
102                 REQUIRE(required == actual);
103             }
104         }
105     }
106 }
107 
108 SCENARIO("sample with time, same", "[sample_with_time][operators]"){
109     GIVEN("1 hot observable of ints."){
110         auto sc = rxsc::make_test();
111         auto so = rx::synchronize_in_one_worker(sc);
112         auto w = sc.create_worker();
113         const rxsc::test::messages<int> on;
114         const rxsc::test::messages<std::vector<int>> v_on;
115 
116         auto xs = sc.make_hot_observable({
117             on.next(100, 1),
118             on.next(210, 2),
119             on.next(240, 3),
120             on.next(280, 4),
121             on.next(320, 5),
122             on.next(350, 6),
123             on.next(380, 7),
124             on.next(420, 8),
125             on.next(470, 9),
126             on.completed(600)
127         });
128         WHEN("group ints on intervals"){
129             using namespace std::chrono;
130 
131             auto res = w.start(
__anon94cd60ac0302() 132                 [&]() {
133                     return xs
134                         .sample_with_time(milliseconds(100), so)
135                         .as_dynamic();
136                 }
137             );
138 
139             THEN("the output contains groups of ints"){
140                 auto required = rxu::to_vector({
141                     on.next(301, 4),
142                     on.next(401, 7),
143                     on.next(501, 9),
144                     on.completed(601)
145                 });
146                 auto actual = res.get_observer().messages();
147                 REQUIRE(required == actual);
148             }
149 
150             THEN("there was one subscription and one unsubscription to the xs"){
151                 auto required = rxu::to_vector({
152                     on.subscribe(200, 600)
153                 });
154                 auto actual = xs.subscriptions();
155                 REQUIRE(required == actual);
156             }
157         }
158     }
159 }
160