1 #include "../test.h"
2 #include <rxcpp/operators/rx-reduce.hpp>
3 #include <rxcpp/operators/rx-merge.hpp>
4 #include <rxcpp/operators/rx-observe_on.hpp>
5 
6 const int static_onnextcalls = 1000000;
7 
8 
9 SCENARIO("synchronize merge ranges", "[!hide][range][synchronize][merge][perf]"){
10     const int& onnextcalls = static_onnextcalls;
11     GIVEN("some ranges"){
12         WHEN("generating ints"){
13             using namespace std::chrono;
14             typedef steady_clock clock;
15 
16             auto so = rx::synchronize_event_loop();
17 
18             int n = 1;
19             auto sectionCount = onnextcalls / 3;
20             auto start = clock::now();
21             int c = rxs::range(0, sectionCount - 1, 1, so)
22                 .merge(
23                     so,
24                     rxs::range(sectionCount, (sectionCount * 2) - 1, 1, so),
25                     rxs::range(sectionCount * 2, onnextcalls - 1, 1, so))
26                 .as_blocking()
27                 .count();
28 
29             auto finish = clock::now();
30             auto msElapsed = duration_cast<milliseconds>(finish.time_since_epoch()) -
31                    duration_cast<milliseconds>(start.time_since_epoch());
32             std::cout << "merge sync ranges : " << n << " subscribed, " << c << " emitted, " << msElapsed.count() << "ms elapsed " << std::endl;
33         }
34     }
35 }
36 
37 SCENARIO("observe_on merge ranges", "[!hide][range][observe_on][merge][perf]"){
38     const int& onnextcalls = static_onnextcalls;
39     GIVEN("some ranges"){
40         WHEN("generating ints"){
41             using namespace std::chrono;
42             typedef steady_clock clock;
43 
44             auto so = rx::observe_on_event_loop();
45 
46             int n = 1;
47             auto sectionCount = onnextcalls / 3;
48             auto start = clock::now();
49             int c = rxs::range(0, sectionCount - 1, 1, so)
50                 .merge(
51                     so,
52                     rxs::range(sectionCount, (sectionCount * 2) - 1, 1, so),
53                     rxs::range(sectionCount * 2, onnextcalls - 1, 1, so))
54                 .as_blocking()
55                 .count();
56 
57             auto finish = clock::now();
58             auto msElapsed = duration_cast<milliseconds>(finish.time_since_epoch()) -
59                    duration_cast<milliseconds>(start.time_since_epoch());
60             std::cout << "merge observe_on ranges : " << n << " subscribed, " << c << " emitted, " << msElapsed.count() << "ms elapsed " << std::endl;
61         }
62     }
63 }
64 
65 SCENARIO("serialize merge ranges", "[!hide][range][serialize][merge][perf]"){
66     const int& onnextcalls = static_onnextcalls;
67     GIVEN("some ranges"){
68         WHEN("generating ints"){
69             using namespace std::chrono;
70             typedef steady_clock clock;
71 
72             auto so = rx::serialize_event_loop();
73 
74             int n = 1;
75             auto sectionCount = onnextcalls / 3;
76             auto start = clock::now();
77             int c = rxs::range(0, sectionCount - 1, 1, so)
78                 .merge(
79                     so,
80                     rxs::range(sectionCount, (sectionCount * 2) - 1, 1, so),
81                     rxs::range(sectionCount * 2, onnextcalls - 1, 1, so))
82                 .as_blocking()
83                 .count();
84 
85             auto finish = clock::now();
86             auto msElapsed = duration_cast<milliseconds>(finish.time_since_epoch()) -
87                    duration_cast<milliseconds>(start.time_since_epoch());
88             std::cout << "merge serial ranges : " << n << " subscribed, " << c << " emitted, " << msElapsed.count() << "ms elapsed " << std::endl;
89         }
90     }
91 }
92 
93 SCENARIO("merge completes", "[merge][join][operators]"){
94     GIVEN("1 hot observable with 3 cold observables of ints."){
95         auto sc = rxsc::make_test();
96         auto w = sc.create_worker();
97         const rxsc::test::messages<int> on;
98         const rxsc::test::messages<rx::observable<int>> o_on;
99 
100         auto ys1 = sc.make_cold_observable({
101             on.next(10, 101),
102             on.next(20, 102),
103             on.next(110, 103),
104             on.next(120, 104),
105             on.next(210, 105),
106             on.next(220, 106),
107             on.completed(230)
108         });
109 
110         auto ys2 = sc.make_cold_observable({
111             on.next(10, 201),
112             on.next(20, 202),
113             on.next(30, 203),
114             on.next(40, 204),
115             on.completed(50)
116         });
117 
118         auto ys3 = sc.make_cold_observable({
119             on.next(10, 301),
120             on.next(20, 302),
121             on.next(30, 303),
122             on.next(40, 304),
123             on.next(120, 305),
124             on.completed(150)
125         });
126 
127         auto xs = sc.make_hot_observable({
128             o_on.next(300, ys1),
129             o_on.next(400, ys2),
130             o_on.next(500, ys3),
131             o_on.completed(600)
132         });
133 
134         WHEN("each int is merged"){
135 
136             auto res = w.start(
__anon6da9c71a0102() 137                 [&]() {
138                     return xs
139                         | rxo::merge()
140                         // forget type to workaround lambda deduction bug on msvc 2013
141                         | rxo::as_dynamic();
142                 }
143             );
144 
145             THEN("the output contains merged ints"){
146                 auto required = rxu::to_vector({
147                     on.next(310, 101),
148                     on.next(320, 102),
149                     on.next(410, 103),
150                     on.next(410, 201),
151                     on.next(420, 104),
152                     on.next(420, 202),
153                     on.next(430, 203),
154                     on.next(440, 204),
155                     on.next(510, 105),
156                     on.next(510, 301),
157                     on.next(520, 106),
158                     on.next(520, 302),
159                     on.next(530, 303),
160                     on.next(540, 304),
161                     on.next(620, 305),
162                     on.completed(650)
163                 });
164                 auto actual = res.get_observer().messages();
165                 REQUIRE(required == actual);
166             }
167 
168             THEN("there was one subscription and one unsubscription to the xs"){
169                 auto required = rxu::to_vector({
170                     on.subscribe(200, 600)
171                 });
172                 auto actual = xs.subscriptions();
173                 REQUIRE(required == actual);
174             }
175 
176             THEN("there was one subscription and one unsubscription to the ys1"){
177                 auto required = rxu::to_vector({
178                     on.subscribe(300, 530)
179                 });
180                 auto actual = ys1.subscriptions();
181                 REQUIRE(required == actual);
182             }
183 
184             THEN("there was one subscription and one unsubscription to the ys2"){
185                 auto required = rxu::to_vector({
186                     on.subscribe(400, 450)
187                 });
188                 auto actual = ys2.subscriptions();
189                 REQUIRE(required == actual);
190             }
191 
192             THEN("there was one subscription and one unsubscription to the ys3"){
193                 auto required = rxu::to_vector({
194                     on.subscribe(500, 650)
195                 });
196                 auto actual = ys3.subscriptions();
197                 REQUIRE(required == actual);
198             }
199         }
200     }
201 }
202 
203 SCENARIO("variadic merge completes", "[merge][join][operators]"){
204     GIVEN("1 hot observable with 3 cold observables of ints."){
205         auto sc = rxsc::make_test();
206         auto w = sc.create_worker();
207         const rxsc::test::messages<int> on;
208         const rxsc::test::messages<rx::observable<int>> o_on;
209 
210         auto ys1 = sc.make_cold_observable({
211             on.next(10, 101),
212             on.next(20, 102),
213             on.next(110, 103),
214             on.next(120, 104),
215             on.next(210, 105),
216             on.next(220, 106),
217             on.completed(230)
218         });
219 
220         auto ys2 = sc.make_cold_observable({
221             on.next(10, 201),
222             on.next(20, 202),
223             on.next(30, 203),
224             on.next(40, 204),
225             on.completed(50)
226         });
227 
228         auto ys3 = sc.make_cold_observable({
229             on.next(10, 301),
230             on.next(20, 302),
231             on.next(30, 303),
232             on.next(40, 304),
233             on.next(120, 305),
234             on.completed(150)
235         });
236 
237         WHEN("each int is merged"){
238 
239             auto res = w.start(
__anon6da9c71a0202() 240                 [&]() {
241                     return ys1
242                         .merge(ys2, ys3);
243                 }
244             );
245 
246             THEN("the output contains merged ints"){
247                 auto required = rxu::to_vector({
248                     on.next(210, 101),
249                     on.next(210, 201),
250                     on.next(210, 301),
251                     on.next(220, 102),
252                     on.next(220, 202),
253                     on.next(220, 302),
254                     on.next(230, 203),
255                     on.next(230, 303),
256                     on.next(240, 204),
257                     on.next(240, 304),
258                     on.next(310, 103),
259                     on.next(320, 104),
260                     on.next(320, 305),
261                     on.next(410, 105),
262                     on.next(420, 106),
263                     on.completed(430)
264                 });
265                 auto actual = res.get_observer().messages();
266                 REQUIRE(required == actual);
267             }
268 
269             THEN("there was one subscription and one unsubscription to the ys1"){
270                 auto required = rxu::to_vector({
271                     on.subscribe(200, 430)
272                 });
273                 auto actual = ys1.subscriptions();
274                 REQUIRE(required == actual);
275             }
276 
277             THEN("there was one subscription and one unsubscription to the ys2"){
278                 auto required = rxu::to_vector({
279                     on.subscribe(200, 250)
280                 });
281                 auto actual = ys2.subscriptions();
282                 REQUIRE(required == actual);
283             }
284 
285             THEN("there was one subscription and one unsubscription to the ys3"){
286                 auto required = rxu::to_vector({
287                     on.subscribe(200, 350)
288                 });
289                 auto actual = ys3.subscriptions();
290                 REQUIRE(required == actual);
291             }
292         }
293     }
294 }
295