1 #include "../test.h"
2 #include "rxcpp/operators/rx-combine_latest.hpp"
3 #include "rxcpp/operators/rx-map.hpp"
4 #include "rxcpp/operators/rx-take.hpp"
5 #include "rxcpp/operators/rx-observe_on.hpp"
6 #include "rxcpp/operators/rx-publish.hpp"
7 #include "rxcpp/operators/rx-ref_count.hpp"
8 
9 #include <sstream>
10 
11 SCENARIO("observe subscription", "[!hide]"){
12     GIVEN("observable of ints"){
13         WHEN("subscribe"){
14             auto observers = std::make_shared<std::list<rxcpp::subscriber<int>>>();
15 
__anon1ab139c80102(rxcpp::subscriber<int> out)16             auto observable = rxcpp::observable<>::create<int>([=](rxcpp::subscriber<int> out){
17                 auto it = observers->insert(observers->end(), out);
18                 it->add([=](){
19                     observers->erase(it);
20                 });
21             });
22 
23         }
24     }
25 }
26 
27 static const int static_subscriptions = 10000;
28 
29 SCENARIO("for loop subscribes to map", "[!hide][for][just][subscribe][long][perf]"){
30     const int& subscriptions = static_subscriptions;
31     GIVEN("a for loop"){
32         WHEN("subscribe 100K times"){
33             using namespace std::chrono;
34             typedef steady_clock clock;
35 
36             auto sc = rxsc::make_current_thread();
37             auto w = sc.create_worker();
38             int runs = 10;
39 
__anon1ab139c80302(const rxsc::schedulable& self) 40             auto loop = [&](const rxsc::schedulable& self) {
41                 int c = 0;
42                 int n = 1;
43                 auto start = clock::now();
44                 for (int i = 0; i < subscriptions; i++) {
45                     rx::observable<>::just(1)
46                         .map([](int i) {
47                             std::stringstream serializer;
48                             serializer << i;
49                             return serializer.str();
50                         })
51                         .map([](const std::string& s) {
52                             int i;
53                             std::stringstream(s) >> i;
54                             return i;
55                         })
56                         .subscribe([&](int){
57                             ++c;
58                         });
59                 }
60                 auto finish = clock::now();
61                 auto msElapsed = duration_cast<milliseconds>(finish-start);
62                 std::cout << "loop subscribe map             : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed, " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
63 
64                 if (--runs > 0) {
65                     self();
66                 }
67             };
68 
69             w.schedule(loop);
70         }
71     }
72 }
73 
74 SCENARIO("for loop subscribes to combine_latest", "[!hide][for][just][combine_latest][subscribe][long][perf]"){
75     const int& subscriptions = static_subscriptions;
76     GIVEN("a for loop"){
77         WHEN("subscribe 100K times"){
78             using namespace std::chrono;
79             typedef steady_clock clock;
80 
81             auto sc = rxsc::make_current_thread();
82             auto w = sc.create_worker();
83             int runs = 10;
84 
__anon1ab139c80702(const rxsc::schedulable& self) 85             auto loop = [&](const rxsc::schedulable& self) {
86                 int c = 0;
87                 int n = 1;
88                 auto start = clock::now();
89                 for (int i = 0; i < subscriptions; i++) {
90                     rx::observable<>::just(1)
91                         .combine_latest([](int i, int j) {
92                             return i + j;
93                         }, rx::observable<>::just(2))
94                         .subscribe([&](int){
95                             ++c;
96                         });
97                 }
98                 auto finish = clock::now();
99                 auto msElapsed = duration_cast<milliseconds>(finish-start);
100                 std::cout << "loop subscribe combine_latest  : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed, " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
101 
102                 if (--runs > 0) {
103                     self();
104                 }
105             };
106 
107             w.schedule(loop);
108         }
109     }
110 }
111 
112 SCENARIO("synchronized range debug", "[!hide][subscribe][range][synchronize_debug][synchronize][long][perf]"){
113     GIVEN("range"){
114         WHEN("synchronized"){
115             using namespace std::chrono;
116             typedef steady_clock clock;
117 
118             auto sc = rxsc::make_current_thread();
119             auto w = sc.create_worker();
120 
121             auto es = rx::synchronize_event_loop();
122 
123             const int values = 10000;
124 
125             int runs = 10;
126 
__anon1ab139c80a02(const rxsc::schedulable& self) 127             auto loop = [&](const rxsc::schedulable& self) {
128                 std::atomic<int> c(0);
129                 int n = 1;
130                 auto liftrequirecompletion = [&](rx::subscriber<int> dest){
131                     auto completionstate = std::make_shared<std::tuple<bool, long, rx::subscriber<int>>>(false, 0, std::move(dest));
132                     std::get<2>(*completionstate).add([=](){
133                         if (std::get<1>(*completionstate) != values || !std::get<0>(*completionstate)) {
134                             abort();
135                         }
136                     });
137                     // VS2013 deduction issue requires dynamic (type-forgetting)
138                     return rx::make_subscriber<int>(
139                         std::get<2>(*completionstate),
140                         [=](int n){
141                             ++std::get<1>(*completionstate);
142                             std::get<2>(*completionstate).on_next(n);
143                         },
144                         [=](rxu::error_ptr){
145                             abort();
146                         },
147                         [=](){
148                             if (std::get<1>(*completionstate) != values) {
149                                 abort();
150                             }
151                             std::get<0>(*completionstate) = true;
152                             std::get<2>(*completionstate).on_completed();
153                         }).as_dynamic();
154                 };
155                 auto start = clock::now();
156                 auto ew = es.create_coordinator().get_worker();
157                 std::atomic<int> v(0);
158                 auto s0 = rxs::range(1, es)
159                     .take(values)
160                     .lift<int>(liftrequirecompletion)
161                     .as_dynamic()
162                     .publish_synchronized(es)
163                     .ref_count()
164                     .lift<int>(liftrequirecompletion)
165                     .subscribe(
166                         rx::make_observer_dynamic<int>(
167                         [&](int){
168                             ++v;
169                         },
170                         [&](){
171                             ++c;
172                         }));
173                 auto s1 = rxs::range(values + 1, es)
174                     .take(values)
175                     .lift<int>(liftrequirecompletion)
176                     .as_dynamic()
177                     .publish_synchronized(es)
178                     .ref_count()
179                     .lift<int>(liftrequirecompletion)
180                     .subscribe(
181                         rx::make_observer_dynamic<int>(
182                         [&](int){
183                             ++v;
184                         },
185                         [&](){
186                             ++c;
187                         }));
188                 auto s2 = rxs::range((values * 2) + 1, es)
189                     .take(values)
190                     .lift<int>(liftrequirecompletion)
191                     .as_dynamic()
192                     .publish_synchronized(es)
193                     .ref_count()
194                     .lift<int>(liftrequirecompletion)
195                     .subscribe(
196                         rx::make_observer_dynamic<int>(
197                         [&](int){
198                             ++v;
199                         },
200                         [&](){
201                             ++c;
202                         }));
203                 while(v != values * 3 || c != 3);
204                 s0.unsubscribe();
205                 s1.unsubscribe();
206                 s2.unsubscribe();
207                 auto finish = clock::now();
208                 auto msElapsed = duration_cast<milliseconds>(finish-start);
209                 std::cout << "range synchronized : " << n << " subscribed, " << v << " on_next calls, " << msElapsed.count() << "ms elapsed, " << v / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
210 
211                 if (--runs > 0) {
212                     self();
213                 }
214             };
215 
216             w.schedule(loop);
217         }
218     }
219 }
220 
221 SCENARIO("observe_on range debug", "[!hide][subscribe][range][observe_on_debug][observe_on][long][perf]"){
222     GIVEN("range"){
223         WHEN("observed on"){
224             using namespace std::chrono;
225             typedef steady_clock clock;
226 
227             auto sc = rxsc::make_current_thread();
228             auto w = sc.create_worker();
229 
230             auto es = rx::observe_on_event_loop();
231 
232             const int values = 10000;
233 
234             int runs = 10;
235 
__anon1ab139c81602(const rxsc::schedulable& self) 236             auto loop = [&](const rxsc::schedulable& self) {
237                 std::atomic<int> c(0);
238                 int n = 1;
239                 auto liftrequirecompletion = [&](rx::subscriber<int> dest){
240                     auto completionstate = std::make_shared<std::tuple<bool, long, rx::subscriber<int>>>(false, 0, std::move(dest));
241                     std::get<2>(*completionstate).add([=](){
242                         if (std::get<1>(*completionstate) != values || !std::get<0>(*completionstate)) {
243                             abort();
244                         }
245                     });
246                     // VS2013 deduction issue requires dynamic (type-forgetting)
247                     return rx::make_subscriber<int>(
248                         std::get<2>(*completionstate),
249                         [=](int n){
250                             ++std::get<1>(*completionstate);
251                             std::get<2>(*completionstate).on_next(n);
252                         },
253                         [=](rxu::error_ptr){
254                             abort();
255                         },
256                         [=](){
257                             if (std::get<1>(*completionstate) != values) {
258                                 abort();
259                             }
260                             std::get<0>(*completionstate) = true;
261                             std::get<2>(*completionstate).on_completed();
262                         }).as_dynamic();
263                 };
264                 auto start = clock::now();
265                 auto ew = es.create_coordinator().get_worker();
266                 std::atomic<int> v(0);
267                 auto s0 = rxs::range(1, es)
268                     .take(values)
269                     .lift<int>(liftrequirecompletion)
270                     .as_dynamic()
271                     .observe_on(es)
272                     .lift<int>(liftrequirecompletion)
273                     .subscribe(
274                         rx::make_observer_dynamic<int>(
275                         [&](int){
276                             ++v;
277                         },
278                         [&](){
279                             ++c;
280                         }));
281                 auto s1 = rxs::range(values + 1, es)
282                     .take(values)
283                     .lift<int>(liftrequirecompletion)
284                     .as_dynamic()
285                     .observe_on(es)
286                     .lift<int>(liftrequirecompletion)
287                     .subscribe(
288                         rx::make_observer_dynamic<int>(
289                         [&](int){
290                             ++v;
291                         },
292                         [&](){
293                             ++c;
294                         }));
295                 auto s2 = rxs::range((values * 2) + 1, es)
296                     .take(values)
297                     .lift<int>(liftrequirecompletion)
298                     .as_dynamic()
299                     .observe_on(es)
300                     .lift<int>(liftrequirecompletion)
301                     .subscribe(
302                         rx::make_observer_dynamic<int>(
303                         [&](int){
304                             ++v;
305                         },
306                         [&](){
307                             ++c;
308                         }));
309                 while(v != values * 3 || c != 3);
310                 s0.unsubscribe();
311                 s1.unsubscribe();
312                 s2.unsubscribe();
313                 auto finish = clock::now();
314                 auto msElapsed = duration_cast<milliseconds>(finish-start);
315                 std::cout << "range observe_on : " << n << " subscribed, " << v << " on_next calls, " << msElapsed.count() << "ms elapsed, " << v / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
316 
317                 if (--runs > 0) {
318                     self();
319                 }
320             };
321 
322             w.schedule(loop);
323         }
324     }
325 }
326 
327 SCENARIO("subscription traits", "[subscription][traits]"){
328     GIVEN("given some subscription types"){
329         auto es = rx::make_subscription();
330         rx::composite_subscription cs;
331         WHEN("tested"){
332             THEN("is_subscription value is true for empty subscription"){
333                 REQUIRE(rx::is_subscription<decltype(es)>::value);
334             }
335             THEN("is_subscription value is true for composite_subscription"){
336                 REQUIRE(rx::is_subscription<decltype(cs)>::value);
337             }
338         }
339     }
340 }
341 
342 SCENARIO("non-subscription traits", "[subscription][traits]"){
343     GIVEN("given some non-subscription types"){
__anon1ab139c82202()344         auto l = [](){};
345         int i = 0;
346         void* v = nullptr;
347         WHEN("tested"){
348             THEN("is_subscription value is false for lambda"){
349                 l();
350                 REQUIRE(!rx::is_subscription<decltype(l)>::value);
351             }
352             THEN("is_subscription value is false for int"){
353                 i = 0;
354                 REQUIRE(!rx::is_subscription<decltype(i)>::value);
355             }
356             THEN("is_subscription value is false for void*"){
357                 v = nullptr;
358                 REQUIRE(!rx::is_subscription<decltype(v)>::value);
359             }
360             THEN("is_subscription value is false for void"){
361                 REQUIRE(!rx::is_subscription<void>::value);
362             }
363         }
364     }
365 }
366 
367 SCENARIO("subscription static", "[subscription]"){
368     GIVEN("given a subscription"){
369         int i=0;
__anon1ab139c82302()370         auto s = rx::make_subscription([&i](){++i;});
371         WHEN("not used"){
372             THEN("is subscribed"){
373                 REQUIRE(s.is_subscribed());
374             }
375             THEN("i is 0"){
376                 REQUIRE(i == 0);
377             }
378         }
379         WHEN("used"){
380             THEN("is not subscribed when unsubscribed once"){
381                 s.unsubscribe();
382                 REQUIRE(!s.is_subscribed());
383             }
384             THEN("is not subscribed when unsubscribed twice"){
385                 s.unsubscribe();
386                 s.unsubscribe();
387                 REQUIRE(!s.is_subscribed());
388             }
389             THEN("i is 1 when unsubscribed once"){
390                 s.unsubscribe();
391                 REQUIRE(i == 1);
392             }
393             THEN("i is 1 when unsubscribed twice"){
394                 s.unsubscribe();
395                 s.unsubscribe();
396                 REQUIRE(i == 1);
397             }
398         }
399     }
400 }
401 
402 SCENARIO("subscription empty", "[subscription]"){
403     GIVEN("given an empty subscription"){
404         auto s = rx::make_subscription();
405         WHEN("not used"){
406             THEN("is not subscribed"){
407                 REQUIRE(!s.is_subscribed());
408             }
409         }
410         WHEN("used"){
411             THEN("is not subscribed when unsubscribed once"){
412                 s.unsubscribe();
413                 REQUIRE(!s.is_subscribed());
414             }
415             THEN("is not subscribed when unsubscribed twice"){
416                 s.unsubscribe();
417                 s.unsubscribe();
418                 REQUIRE(!s.is_subscribed());
419             }
420         }
421     }
422 }
423 
424 SCENARIO("subscription composite", "[subscription]"){
425     GIVEN("given a subscription"){
426         int i=0;
427         rx::composite_subscription s;
428         s.add(rx::make_subscription());
__anon1ab139c82402()429         s.add(rx::make_subscription([&i](){++i;}));
__anon1ab139c82502()430         s.add([&i](){++i;});
431         WHEN("not used"){
432             THEN("is subscribed"){
433                 REQUIRE(s.is_subscribed());
434             }
435             THEN("i is 0"){
436                 REQUIRE(i == 0);
437             }
438         }
439         WHEN("used"){
440             THEN("is not subscribed when unsubscribed once"){
441                 s.unsubscribe();
442                 REQUIRE(!s.is_subscribed());
443             }
444             THEN("is not subscribed when unsubscribed twice"){
445                 s.unsubscribe();
446                 s.unsubscribe();
447                 REQUIRE(!s.is_subscribed());
448             }
449             THEN("i is 2 when unsubscribed once"){
450                 s.unsubscribe();
451                 REQUIRE(i == 2);
452             }
453             THEN("i is 2 when unsubscribed twice"){
454                 s.unsubscribe();
455                 s.unsubscribe();
456                 REQUIRE(i == 2);
457             }
458         }
459     }
460 }
461 
462