1 #include "../test.h"
2 #include <rxcpp/operators/rx-map.hpp>
3 #include <rxcpp/operators/rx-merge.hpp>
4 #include <rxcpp/operators/rx-window_toggle.hpp>
5 
6 SCENARIO("window toggle, basic", "[window_toggle][operators]"){
7     GIVEN("1 hot observable of ints and hot observable of opens."){
8         auto sc = rxsc::make_test();
9         auto so = rx::synchronize_in_one_worker(sc);
10         auto w = sc.create_worker();
11         const rxsc::test::messages<int> on;
12         const rxsc::test::messages<std::string> o_on;
13 
14         auto xs = sc.make_hot_observable({
15             on.next(90, 1),
16             on.next(180, 2),
17             on.next(250, 3),
18             on.next(260, 4),
19             on.next(310, 5),
20             on.next(340, 6),
21             on.next(410, 7),
22             on.next(420, 8),
23             on.next(470, 9),
24             on.next(550, 10),
25             on.completed(590)
26         });
27 
28         auto ys = sc.make_hot_observable({
29             on.next(255, 50),
30             on.next(330, 100),
31             on.next(350, 50),
32             on.next(400, 90),
33             on.completed(900)
34         });
35 
36         WHEN("ints are split into windows"){
37             using namespace std::chrono;
38 
39             int wi = 0;
40 
41             auto res = w.start(
__anone54b1b630102() 42                 [&]() {
43                     return xs
44                         | rxo::window_toggle(ys, [&](int y){
45                             return rx::observable<>::timer(milliseconds(y), so);
46                         }, so)
47                         | rxo::map([wi](rxcpp::observable<int> w) mutable {
48                             auto ti = wi++;
49                             return w
50                                 | rxo::map([ti](int x){return std::to_string(ti) + " " + std::to_string(x);})
51                                 // forget type to workaround lambda deduction bug on msvc 2013
52                                 | rxo::as_dynamic();
53                         })
54                         | rxo::merge()
55                         // forget type to workaround lambda deduction bug on msvc 2013
56                         | rxo::as_dynamic();
57                 }
58             );
59 
60             THEN("the output contains ints assigned to windows"){
61                 auto required = rxu::to_vector({
62                     o_on.next(261, "0 4"),
63                     o_on.next(341, "1 6"),
64                     o_on.next(411, "1 7"),
65                     o_on.next(411, "3 7"),
66                     o_on.next(421, "1 8"),
67                     o_on.next(421, "3 8"),
68                     o_on.next(471, "3 9"),
69                     o_on.completed(591)
70                 });
71                 auto actual = res.get_observer().messages();
72                 REQUIRE(required == actual);
73             }
74 
75             THEN("there was one subscription and one unsubscription to the observable"){
76                 auto required = rxu::to_vector({
77                     o_on.subscribe(200, 590)
78                 });
79                 auto actual = xs.subscriptions();
80                 REQUIRE(required == actual);
81             }
82         }
83     }
84 }
85 
86 SCENARIO("window toggle, basic same", "[window_toggle][operators]"){
87     GIVEN("1 hot observable of ints and hot observable of opens."){
88         auto sc = rxsc::make_test();
89         auto so = rx::synchronize_in_one_worker(sc);
90         auto w = sc.create_worker();
91         const rxsc::test::messages<int> on;
92         const rxsc::test::messages<std::string> o_on;
93 
94         auto xs = sc.make_hot_observable({
95             on.next(90, 1),
96             on.next(180, 2),
97             on.next(250, 3),
98             on.next(260, 4),
99             on.next(310, 5),
100             on.next(340, 6),
101             on.next(410, 7),
102             on.next(420, 8),
103             on.next(470, 9),
104             on.next(550, 10),
105             on.completed(590)
106         });
107 
108         auto ys = sc.make_hot_observable({
109             on.next(255, 50),
110             on.next(330, 100),
111             on.next(350, 50),
112             on.next(400, 90),
113             on.completed(900)
114         });
115 
116         WHEN("ints are split into windows"){
117             using namespace std::chrono;
118 
119             int wi = 0;
120 
121             auto res = w.start(
__anone54b1b630502() 122                 [&]() {
123                     return xs
124                         .window_toggle(ys, [&](int){
125                             return ys;
126                         }, so)
127                         .map([wi](rxcpp::observable<int> w) mutable {
128                             auto ti = wi++;
129                             return w
130                                 .map([ti](int x){return std::to_string(ti) + " " + std::to_string(x);})
131                                 // forget type to workaround lambda deduction bug on msvc 2013
132                                 .as_dynamic();
133                         })
134                         .merge()
135                         // forget type to workaround lambda deduction bug on msvc 2013
136                         .as_dynamic();
137                 }
138             );
139 
140             THEN("the output contains ints assigned to windows"){
141                 auto required = rxu::to_vector({
142                     o_on.next(261, "0 4"),
143                     o_on.next(311, "0 5"),
144                     o_on.next(341, "1 6"),
145                     o_on.next(411, "3 7"),
146                     o_on.next(421, "3 8"),
147                     o_on.next(471, "3 9"),
148                     o_on.next(551, "3 10"),
149                     o_on.completed(591)
150                 });
151                 auto actual = res.get_observer().messages();
152                 REQUIRE(required == actual);
153             }
154 
155             THEN("there was one subscription and one unsubscription to the observable"){
156                 auto required = rxu::to_vector({
157                     o_on.subscribe(200, 590)
158                 });
159                 auto actual = xs.subscriptions();
160                 REQUIRE(required == actual);
161             }
162         }
163     }
164 }
165 
166 SCENARIO("window toggle, error", "[window_toggle][operators]"){
167     GIVEN("1 hot observable of ints and hot observable of opens."){
168         auto sc = rxsc::make_test();
169         auto so = rx::synchronize_in_one_worker(sc);
170         auto w = sc.create_worker();
171         const rxsc::test::messages<int> on;
172         const rxsc::test::messages<std::string> o_on;
173 
174         std::runtime_error ex("window_toggle on_error from source");
175 
176         auto xs = sc.make_hot_observable({
177             on.next(90, 1),
178             on.next(180, 2),
179             on.next(250, 3),
180             on.next(260, 4),
181             on.next(310, 5),
182             on.next(340, 6),
183             on.next(410, 7),
184             on.error(420, ex),
185             on.next(470, 9),
186             on.next(550, 10),
187             on.completed(590)
188         });
189 
190         auto ys = sc.make_hot_observable({
191             on.next(255, 50),
192             on.next(330, 100),
193             on.next(350, 50),
194             on.next(400, 90),
195             on.completed(900)
196         });
197 
198         WHEN("ints are split into windows"){
199             using namespace std::chrono;
200 
201             int wi = 0;
202 
203             auto res = w.start(
__anone54b1b630902() 204                 [&]() {
205                     return xs
206                         .window_toggle(ys, [&](int){
207                             return ys;
208                         }, so)
209                         .map([wi](rxcpp::observable<int> w) mutable {
210                             auto ti = wi++;
211                             return w
212                                 .map([ti](int x){return std::to_string(ti) + " " + std::to_string(x);})
213                                 // forget type to workaround lambda deduction bug on msvc 2013
214                                 .as_dynamic();
215                         })
216                         .merge()
217                         // forget type to workaround lambda deduction bug on msvc 2013
218                         .as_dynamic();
219                 }
220             );
221 
222             THEN("the output contains ints assigned to windows"){
223                 auto required = rxu::to_vector({
224                     o_on.next(261, "0 4"),
225                     o_on.next(311, "0 5"),
226                     o_on.next(341, "1 6"),
227                     o_on.next(411, "3 7"),
228                     o_on.error(421, ex)
229                 });
230                 auto actual = res.get_observer().messages();
231                 REQUIRE(required == actual);
232             }
233 
234             THEN("there was one subscription and one unsubscription to the observable"){
235                 auto required = rxu::to_vector({
236                     o_on.subscribe(200, 420)
237                 });
238                 auto actual = xs.subscriptions();
239                 REQUIRE(required == actual);
240             }
241         }
242     }
243 }
244 
245 SCENARIO("window toggle, disposed", "[window_toggle][operators]"){
246     GIVEN("1 hot observable of ints and hot observable of opens."){
247         auto sc = rxsc::make_test();
248         auto so = rx::synchronize_in_one_worker(sc);
249         auto w = sc.create_worker();
250         const rxsc::test::messages<int> on;
251         const rxsc::test::messages<std::string> o_on;
252 
253         auto xs = sc.make_hot_observable({
254             on.next(90, 1),
255             on.next(180, 2),
256             on.next(250, 3),
257             on.next(260, 4),
258             on.next(310, 5),
259             on.next(340, 6),
260             on.next(410, 7),
261             on.next(420, 8),
262             on.next(470, 9),
263             on.next(550, 10),
264             on.completed(590)
265         });
266 
267         auto ys = sc.make_hot_observable({
268             on.next(255, 50),
269             on.next(330, 100),
270             on.next(350, 50),
271             on.next(400, 90),
272             on.completed(900)
273         });
274 
275         WHEN("ints are split into windows"){
276             using namespace std::chrono;
277 
278             int wi = 0;
279 
280             auto res = w.start(
__anone54b1b630d02() 281                 [&]() {
282                     return xs
283                         .window_toggle(ys, [&](int){
284                             return ys;
285                         }, so)
286                         .map([wi](rxcpp::observable<int> w) mutable {
287                             auto ti = wi++;
288                             return w
289                                 .map([ti](int x){return std::to_string(ti) + " " + std::to_string(x);})
290                                 // forget type to workaround lambda deduction bug on msvc 2013
291                                 .as_dynamic();
292                         })
293                         .merge()
294                         // forget type to workaround lambda deduction bug on msvc 2013
295                         .as_dynamic();
296                 },
297                 420
298             );
299 
300             THEN("the output contains ints assigned to windows"){
301                 auto required = rxu::to_vector({
302                     o_on.next(261, "0 4"),
303                     o_on.next(311, "0 5"),
304                     o_on.next(341, "1 6"),
305                     o_on.next(411, "3 7")
306                 });
307                 auto actual = res.get_observer().messages();
308                 REQUIRE(required == actual);
309             }
310 
311             THEN("there was one subscription and one unsubscription to the observable"){
312                 auto required = rxu::to_vector({
313                     o_on.subscribe(200, 420)
314                 });
315                 auto actual = xs.subscriptions();
316                 REQUIRE(required == actual);
317             }
318         }
319     }
320 }
321 
322