1 #define RXCPP_SUBJECT_TEST_ASYNC 1
2 
3 #include "../test.h"
4 
5 #include <rxcpp/operators/rx-finally.hpp>
6 
7 #include <future>
8 
9 
10 const int static_onnextcalls = 10000000;
11 static int aliased = 0;
12 
13 SCENARIO("for loop locks mutex", "[!hide][for][mutex][long][perf]"){
14     const int& onnextcalls = static_onnextcalls;
15     GIVEN("a for loop"){
16         WHEN("locking mutex 100 million times"){
17             using namespace std::chrono;
18             typedef steady_clock clock;
19 
20             int c = 0;
21             int n = 1;
22             auto start = clock::now();
23             std::mutex m;
24             for (int i = 0; i < onnextcalls; i++) {
25                 std::unique_lock<std::mutex> guard(m);
26                 ++c;
27             }
28             auto finish = clock::now();
29             auto msElapsed = duration_cast<milliseconds>(finish-start);
30             std::cout << "loop mutex          : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
31 
32         }
33     }
34 }
35 
36 namespace syncwithvoid {
37 template<class T, class OnNext>
38 class sync_subscriber
39 {
40 public:
41     OnNext onnext;
42     bool issubscribed;
sync_subscriber(OnNext on)43     explicit sync_subscriber(OnNext on)
44         : onnext(on)
45         , issubscribed(true)
46     {
47     }
is_subscribed()48     bool is_subscribed() {return issubscribed;}
unsubscribe()49     void unsubscribe() {issubscribed = false;}
on_next(T v)50     void on_next(T v) {
51         onnext(v);
52     }
53 };
54 }
55 SCENARIO("for loop calls void on_next(int)", "[!hide][for][asyncobserver][baseline][perf]"){
56     const int& onnextcalls = static_onnextcalls;
57     GIVEN("a for loop"){
58         WHEN("calling on_next 100 million times"){
59             using namespace std::chrono;
60             typedef steady_clock clock;
61 
62             auto c = std::addressof(aliased);
63             *c = 0;
64             int n = 1;
65             auto start = clock::now();
__anon6a6aec1e0102(int)66             auto onnext = [c](int){++*c;};
67             syncwithvoid::sync_subscriber<int, decltype(onnext)> scbr(onnext);
68             for (int i = 0; i < onnextcalls && scbr.is_subscribed(); i++) {
69                 scbr.on_next(i);
70             }
71             auto finish = clock::now();
72             auto msElapsed = duration_cast<milliseconds>(finish-start);
73             std::cout << "loop void           : " << n << " subscribed, " << *c << " on_next calls, " << msElapsed.count() << "ms elapsed " << *c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
74 
75         }
76     }
77 }
78 
79 namespace asyncwithready {
80 // ready is an immutable class.
81 class ready
82 {
83 public:
84     typedef std::function<void()> onthen_type;
85 private:
86     std::function<void(onthen_type)> setthen;
87 public:
ready()88     ready() {}
ready(std::function<void (onthen_type)> st)89     ready(std::function<void(onthen_type)> st) : setthen(st) {}
is_ready()90     bool is_ready() {return !setthen;}
then(onthen_type ot)91     void then(onthen_type ot) {
92         if (is_ready()) {
93             abort();
94         }
95         setthen(ot);
96     }
97 };
98 template<class T, class OnNext>
99 class async_subscriber
100 {
101 public:
102     OnNext onnext;
103     bool issubscribed;
104     int count;
async_subscriber(OnNext on)105     explicit async_subscriber(OnNext on)
106         : onnext(on)
107         , issubscribed(true)
108         , count(0)
109     {
110     }
is_subscribed()111     bool is_subscribed() {return issubscribed;}
unsubscribe()112     void unsubscribe() {issubscribed = false;}
on_next(T v)113     ready on_next(T v) {
114         // push v onto queue
115 
116         // under some condition pop v off of queue and pass it on
117         onnext(v);
118 
119         // for demo purposes
120         // simulate queue full every 100000 items
121         if (count == 100000) {
122             // 'queue is full'
123             ready no([this](ready::onthen_type ot){
124                 // full version will sync producer and consumer (in producer push and consumer pop)
125                 // and decide when to restart the producer
126                 if (!this->count) {
127                     ot();
128                 }
129             });
130             // set queue empty since the demo has no separate consumer thread
131             count = 0;
132             // 'queue is empty'
133             return no;
134         }
135         static const ready yes;
136         return yes;
137     }
138 };
139 }
140 SCENARIO("for loop calls ready on_next(int)", "[!hide][for][asyncobserver][ready][perf]"){
141     static const int& onnextcalls = static_onnextcalls;
142     GIVEN("a for loop"){
143         WHEN("calling on_next 100 million times"){
144             using namespace std::chrono;
145             typedef steady_clock clock;
146 
147             auto c = std::addressof(aliased);
148             *c = 0;
149             int n = 1;
150             auto start = clock::now();
__anon6a6aec1e0302(int)151             auto onnext = [&c](int){++*c;};
152             asyncwithready::async_subscriber<int, decltype(onnext)> scbr(onnext);
153             asyncwithready::ready::onthen_type chunk;
154             int i = 0;
__anon6a6aec1e0402() 155             chunk = [&chunk, scbr, i]() mutable {
156                 for (; i < onnextcalls && scbr.is_subscribed(); i++) {
157                     auto controller = scbr.on_next(i);
158                     if (!controller.is_ready()) {
159                         controller.then(chunk);
160                         return;
161                     }
162                 }
163             };
164             chunk();
165             auto finish = clock::now();
166             auto msElapsed = duration_cast<milliseconds>(finish-start);
167             std::cout << "loop ready          : " << n << " subscribed, " << *c << " on_next calls, " << msElapsed.count() << "ms elapsed " << *c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
168 
169         }
170     }
171 }
172 
173 namespace asyncwithfuture {
174 class unit {};
175 template<class T, class OnNext>
176 class async_subscriber
177 {
178 public:
179     OnNext onnext;
180     bool issubscribed;
async_subscriber(OnNext on)181     explicit async_subscriber(OnNext on)
182         : onnext(on)
183         , issubscribed(true)
184     {
185     }
is_subscribed()186     bool is_subscribed() {return issubscribed;}
unsubscribe()187     void unsubscribe() {issubscribed = false;}
on_next(T v)188     std::future<unit> on_next(T v) {
189         std::promise<unit> ready;
190         ready.set_value(unit());
191         onnext(v); return ready.get_future();}
192 };
193 }
194 SCENARIO("for loop calls std::future<unit> on_next(int)", "[!hide][for][asyncobserver][future][long][perf]"){
195     const int& onnextcalls = static_onnextcalls;
196     GIVEN("a for loop"){
197         WHEN("calling on_next 100 million times"){
198             using namespace std::chrono;
199             typedef steady_clock clock;
200 
201             auto c = std::addressof(aliased);
202             *c = 0;
203             int n = 1;
204             auto start = clock::now();
__anon6a6aec1e0502(int)205             auto onnext = [&c](int){++*c;};
206             asyncwithfuture::async_subscriber<int, decltype(onnext)> scbr(onnext);
207             for (int i = 0; i < onnextcalls && scbr.is_subscribed(); i++) {
208                 auto isready = scbr.on_next(i);
209                 if (isready.wait_for(std::chrono::milliseconds(0)) == std::future_status::timeout) {
210                     isready.wait();
211                 }
212             }
213             auto finish = clock::now();
214             auto msElapsed = duration_cast<milliseconds>(finish-start);
215             std::cout << "loop future<unit>   : " << n << " subscribed, " << *c << " on_next calls, " << msElapsed.count() << "ms elapsed " << *c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
216 
217         }
218     }
219 }
220 
221 SCENARIO("for loop calls observer", "[!hide][for][observer][perf]"){
222     const int& onnextcalls = static_onnextcalls;
223     GIVEN("a for loop"){
224         WHEN("observing 100 million ints"){
225             using namespace std::chrono;
226             typedef steady_clock clock;
227 
228             static int& c = aliased;
229             int n = 1;
230 
231             c = 0;
232             auto start = clock::now();
233             auto o = rx::make_observer<int>(
__anon6a6aec1e0602(int)234                 [](int){++c;},
__anon6a6aec1e0702(rxu::error_ptr)235                 [](rxu::error_ptr){abort();});
236             for (int i = 0; i < onnextcalls; i++) {
237                 o.on_next(i);
238             }
239             o.on_completed();
240             auto finish = clock::now();
241             auto msElapsed = duration_cast<milliseconds>(finish-start);
242             std::cout << "loop -> observer    : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec"<< std::endl;
243         }
244     }
245 }
246 
247 SCENARIO("for loop calls subscriber", "[!hide][for][subscriber][perf]"){
248     const int& onnextcalls = static_onnextcalls;
249     GIVEN("a for loop"){
250         WHEN("observing 100 million ints"){
251             using namespace std::chrono;
252             typedef steady_clock clock;
253 
254             static int& c = aliased;
255             int n = 1;
256 
257             c = 0;
258             auto start = clock::now();
259             auto o = rx::make_subscriber<int>(
__anon6a6aec1e0802(int)260                 [](int){++c;},
__anon6a6aec1e0902(rxu::error_ptr)261                 [](rxu::error_ptr){abort();});
262             for (int i = 0; i < onnextcalls && o.is_subscribed(); i++) {
263                 o.on_next(i);
264             }
265             o.on_completed();
266             auto finish = clock::now();
267             auto msElapsed = duration_cast<milliseconds>(finish-start);
268             std::cout << "loop -> subscriber  : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
269         }
270     }
271 }
272 
273 SCENARIO("range calls subscriber", "[!hide][range][subscriber][perf]"){
274     const int& onnextcalls = static_onnextcalls;
275     GIVEN("a range"){
276         WHEN("observing 100 million ints"){
277             using namespace std::chrono;
278             typedef steady_clock clock;
279 
280             static int& c = aliased;
281             int n = 1;
282 
283             c = 0;
284             auto start = clock::now();
285 
286             rxs::range<int>(1, onnextcalls).subscribe(
__anon6a6aec1e0a02(int)287                 [](int){
288                     ++c;
289                 },
__anon6a6aec1e0b02(rxu::error_ptr)290                 [](rxu::error_ptr){abort();});
291 
292             auto finish = clock::now();
293             auto msElapsed = duration_cast<milliseconds>(finish-start);
294             std::cout << "range -> subscriber : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
295         }
296     }
297 }
298 
299 SCENARIO("for loop calls subject", "[!hide][for][subject][subjects][long][perf]"){
300     static const int& onnextcalls = static_onnextcalls;
301     GIVEN("a for loop and a subject"){
302         WHEN("multicasting a million ints"){
303             using namespace std::chrono;
304             typedef steady_clock clock;
305 
306             for (int n = 0; n < 10; n++)
307             {
308                 auto p = std::make_shared<int>(0);
309                 auto c = std::make_shared<int>(0);
310                 rxsub::subject<int> sub;
311 
312 #if RXCPP_SUBJECT_TEST_ASYNC
313                 std::vector<std::future<int>> f(n);
314                 std::atomic<int> asyncUnsubscriptions{0};
315 #endif
316 
317                 auto o = sub.get_subscriber();
318 
__anon6a6aec1e0c02()319                 o.add(rx::make_subscription([c, n](){
320                     auto expected = n * onnextcalls;
321                     REQUIRE(*c == expected);
322                 }));
323 
324                 for (int i = 0; i < n; i++) {
325 #if RXCPP_SUBJECT_TEST_ASYNC
__anon6a6aec1e0d02() 326                     f[i] = std::async([sub, o, &asyncUnsubscriptions]() {
327                         auto source = sub.get_observable();
328                         while(o.is_subscribed()) {
329                             std::this_thread::sleep_for(std::chrono::milliseconds(100));
330                             rx::composite_subscription cs;
331                             source
332                             .finally([&asyncUnsubscriptions](){
333                                 ++asyncUnsubscriptions;})
334                             .subscribe(
335                                 rx::make_subscriber<int>(
336                                 cs,
337                                 [cs](int){
338                                     cs.unsubscribe();
339                                 },
340                                 [](rxu::error_ptr){abort();}));
341                         }
342                         return 0;
343                     });
344 #endif
345                     sub.get_observable().subscribe(
__anon6a6aec1e1102(int)346                         [c, p](int){
347                             ++(*c);
348                         },
__anon6a6aec1e1202(rxu::error_ptr)349                         [](rxu::error_ptr){abort();});
350                 }
351 
352                 auto start = clock::now();
353                 for (int i = 0; i < onnextcalls && o.is_subscribed(); i++) {
354 #if RXCPP_DEBUG_SUBJECT_RACE
355                     if (*p != *c) abort();
356                     (*p) += n;
357 #endif
358                     o.on_next(i);
359                 }
360                 o.on_completed();
361                 auto finish = clock::now();
362                 auto msElapsed = duration_cast<milliseconds>(finish-start);
363                 std::cout << "loop -> subject     : " << n << " subscribed, " << std::setw(9) << (*c) << " on_next calls, ";
364 #if RXCPP_SUBJECT_TEST_ASYNC
365                 std::cout << std::setw(4) << asyncUnsubscriptions << " async, ";
366 #endif
367                 std::cout << msElapsed.count() << "ms elapsed " << *c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
368             }
369         }
370     }
371 }
372 
373 SCENARIO("range calls subject", "[!hide][range][subject][subjects][long][perf]"){
374     static const int& onnextcalls = static_onnextcalls;
375     GIVEN("a range and a subject"){
376         WHEN("multicasting a million ints"){
377             using namespace std::chrono;
378             typedef steady_clock clock;
379             for (int n = 0; n < 10; n++)
380             {
381                 auto p = std::make_shared<int>(0);
382                 auto c = std::make_shared<int>(0);
383                 rxsub::subject<int> sub;
384 
385 #if RXCPP_SUBJECT_TEST_ASYNC
386                 std::vector<std::future<int>> f(n);
387                 std::atomic<int> asyncUnsubscriptions{0};
388 #endif
389 
390                 auto o = sub.get_subscriber();
391 
__anon6a6aec1e1302()392                 o.add(rx::make_subscription([c, n](){
393                     auto expected = n * onnextcalls;
394                     REQUIRE(*c == expected);
395                 }));
396 
397                 for (int i = 0; i < n; i++) {
398 #if RXCPP_SUBJECT_TEST_ASYNC
__anon6a6aec1e1402() 399                     f[i] = std::async([sub, o, &asyncUnsubscriptions]() {
400                         while(o.is_subscribed()) {
401                             std::this_thread::sleep_for(std::chrono::milliseconds(100));
402                             rx::composite_subscription cs;
403                             sub.get_observable()
404                             .finally([&asyncUnsubscriptions](){
405                                 ++asyncUnsubscriptions;})
406                             .subscribe(cs,
407                                 [cs](int){
408                                     cs.unsubscribe();
409                                 },
410                                 [](rxu::error_ptr){abort();});
411                         }
412                         return 0;
413                     });
414 #endif
415                     sub.get_observable()
416                         .subscribe(
__anon6a6aec1e1802(int)417                             [c, p](int){
418                                ++(*c);
419                             },
__anon6a6aec1e1902(rxu::error_ptr)420                             [](rxu::error_ptr){abort();}
421                         );
422                 }
423 
424                 auto start = clock::now();
425                 rxs::range<int>(1, onnextcalls)
426 #if RXCPP_DEBUG_SUBJECT_RACE
__anon6a6aec1e1a02(int)427                     .filter([c, p, n](int){
428                         if (*p != *c) abort();
429                         (*p) += n;
430                         return true;
431                     })
432 #endif
433                     .subscribe(o);
434                 auto finish = clock::now();
435                 auto msElapsed = duration_cast<milliseconds>(finish-start);
436                 std::cout << "range -> subject    : " << n << " subscribed, " << std::setw(9) << (*c) << " on_next calls, ";
437 #if RXCPP_SUBJECT_TEST_ASYNC
438                 std::cout << std::setw(4) << asyncUnsubscriptions << " async, ";
439 #endif
440                 std::cout << msElapsed.count() << "ms elapsed " << *c / (msElapsed.count() / 1000.0) << " ops/sec"<< std::endl;
441             }
442         }
443     }
444 }
445 
446 
447 SCENARIO("subject - infinite source", "[subject][subjects]"){
448     GIVEN("a subject and an infinite source"){
449 
450         auto sc = rxsc::make_test();
451         auto w = sc.create_worker();
452         const rxsc::test::messages<int> on;
453         const rxsc::test::messages<bool> check;
454 
455         auto xs = sc.make_hot_observable({
456             on.next(70, 1),
457             on.next(110, 2),
458             on.next(220, 3),
459             on.next(270, 4),
460             on.next(340, 5),
461             on.next(410, 6),
462             on.next(520, 7),
463             on.next(630, 8),
464             on.next(710, 9),
465             on.next(870, 10),
466             on.next(940, 11),
467             on.next(1020, 12)
468         });
469 
470         rxsub::subject<int> s;
471 
472         auto results1 = w.make_subscriber<int>();
473 
474         auto results2 = w.make_subscriber<int>();
475 
476         auto results3 = w.make_subscriber<int>();
477 
478         WHEN("multicasting an infinite source"){
479 
480             auto checks = rxu::to_vector({
481                 check.next(0, false)
482             });
483 
__anon6a6aec1e1b02(long at) 484             auto record = [&s, &check, &checks](long at) -> void {
485                 checks.push_back(check.next(at, s.has_observers()));
486             };
487 
488             auto o = s.get_subscriber();
489 
__anon6a6aec1e1c02(const rxsc::schedulable&)490             w.schedule_absolute(100, [&s, &o, &checks, &record](const rxsc::schedulable&){
491                 s = rxsub::subject<int>(); o = s.get_subscriber(); checks.clear(); record(100);});
__anon6a6aec1e1d02(const rxsc::schedulable&)492             w.schedule_absolute(200, [&xs, &o, &record](const rxsc::schedulable&){
493                 xs.subscribe(o); record(200);});
__anon6a6aec1e1e02(const rxsc::schedulable&)494             w.schedule_absolute(1000, [&o, &record](const rxsc::schedulable&){
495                 o.unsubscribe(); record(1000);});
496 
__anon6a6aec1e1f02(const rxsc::schedulable&)497             w.schedule_absolute(300, [&s, &results1, &record](const rxsc::schedulable&){
498                 s.get_observable().subscribe(results1); record(300);});
__anon6a6aec1e2002(const rxsc::schedulable&)499             w.schedule_absolute(400, [&s, &results2, &record](const rxsc::schedulable&){
500                 s.get_observable().subscribe(results2); record(400);});
__anon6a6aec1e2102(const rxsc::schedulable&)501             w.schedule_absolute(900, [&s, &results3, &record](const rxsc::schedulable&){
502                 s.get_observable().subscribe(results3); record(900);});
503 
__anon6a6aec1e2202(const rxsc::schedulable&)504             w.schedule_absolute(600, [&results1, &record](const rxsc::schedulable&){
505                 results1.unsubscribe(); record(600);});
__anon6a6aec1e2302(const rxsc::schedulable&)506             w.schedule_absolute(700, [&results2, &record](const rxsc::schedulable&){
507                 results2.unsubscribe(); record(700);});
__anon6a6aec1e2402(const rxsc::schedulable&)508             w.schedule_absolute(800, [&results1, &record](const rxsc::schedulable&){
509                 results1.unsubscribe(); record(800);});
__anon6a6aec1e2502(const rxsc::schedulable&)510             w.schedule_absolute(950, [&results3, &record](const rxsc::schedulable&){
511                 results3.unsubscribe(); record(950);});
512 
513             w.start();
514 
515             THEN("result1 contains expected messages"){
516                 auto required = rxu::to_vector({
517                     on.next(340, 5),
518                     on.next(410, 6),
519                     on.next(520, 7)
520                 });
521                 auto actual = results1.get_observer().messages();
522                 REQUIRE(required == actual);
523             }
524 
525             THEN("result2 contains expected messages"){
526                 auto required = rxu::to_vector({
527                     on.next(410, 6),
528                     on.next(520, 7),
529                     on.next(630, 8)
530                 });
531                 auto actual = results2.get_observer().messages();
532                 REQUIRE(required == actual);
533             }
534 
535             THEN("result3 contains expected messages"){
536                 auto required = rxu::to_vector({
537                     on.next(940, 11)
538                 });
539                 auto actual = results3.get_observer().messages();
540                 REQUIRE(required == actual);
541             }
542 
543             THEN("checks contains expected messages"){
544                 auto required = rxu::to_vector({
545                     check.next(100, false),
546                     check.next(200, false),
547                     check.next(300, true),
548                     check.next(400, true),
549                     check.next(600, true),
550                     check.next(700, false),
551                     check.next(800, false),
552                     check.next(900, true),
553                     check.next(950, false),
554                     check.next(1000, false)
555                 });
556                 auto actual = checks;
557                 REQUIRE(required == actual);
558             }
559 
560         }
561     }
562 }
563 
564 SCENARIO("subject - finite source", "[subject][subjects]"){
565     GIVEN("a subject and an finite source"){
566 
567         auto sc = rxsc::make_test();
568         auto w = sc.create_worker();
569         const rxsc::test::messages<int> on;
570 
571         auto xs = sc.make_hot_observable({
572             on.next(70, 1),
573             on.next(110, 2),
574             on.next(220, 3),
575             on.next(270, 4),
576             on.next(340, 5),
577             on.next(410, 6),
578             on.next(520, 7),
579             on.completed(630),
580             on.next(640, 9),
581             on.completed(650),
582             on.error(660, std::runtime_error("error on unsubscribed stream"))
583         });
584 
585         rxsub::subject<int> s;
586 
587         auto results1 = w.make_subscriber<int>();
588 
589         auto results2 = w.make_subscriber<int>();
590 
591         auto results3 = w.make_subscriber<int>();
592 
593         WHEN("multicasting an infinite source"){
594 
595             auto o = s.get_subscriber();
596 
__anon6a6aec1e2602(const rxsc::schedulable&)597             w.schedule_absolute(100, [&s, &o](const rxsc::schedulable&){
598                 s = rxsub::subject<int>(); o = s.get_subscriber();});
__anon6a6aec1e2702(const rxsc::schedulable&)599             w.schedule_absolute(200, [&xs, &o](const rxsc::schedulable&){
600                 xs.subscribe(o);});
__anon6a6aec1e2802(const rxsc::schedulable&)601             w.schedule_absolute(1000, [&o](const rxsc::schedulable&){
602                 o.unsubscribe();});
603 
__anon6a6aec1e2902(const rxsc::schedulable&)604             w.schedule_absolute(300, [&s, &results1](const rxsc::schedulable&){
605                 s.get_observable().subscribe(results1);});
__anon6a6aec1e2a02(const rxsc::schedulable&)606             w.schedule_absolute(400, [&s, &results2](const rxsc::schedulable&){
607                 s.get_observable().subscribe(results2);});
__anon6a6aec1e2b02(const rxsc::schedulable&)608             w.schedule_absolute(900, [&s, &results3](const rxsc::schedulable&){
609                 s.get_observable().subscribe(results3);});
610 
__anon6a6aec1e2c02(const rxsc::schedulable&)611             w.schedule_absolute(600, [&results1](const rxsc::schedulable&){
612                 results1.unsubscribe();});
__anon6a6aec1e2d02(const rxsc::schedulable&)613             w.schedule_absolute(700, [&results2](const rxsc::schedulable&){
614                 results2.unsubscribe();});
__anon6a6aec1e2e02(const rxsc::schedulable&)615             w.schedule_absolute(800, [&results1](const rxsc::schedulable&){
616                 results1.unsubscribe();});
__anon6a6aec1e2f02(const rxsc::schedulable&)617             w.schedule_absolute(950, [&results3](const rxsc::schedulable&){
618                 results3.unsubscribe();});
619 
620             w.start();
621 
622             THEN("result1 contains expected messages"){
623                 auto required = rxu::to_vector({
624                     on.next(340, 5),
625                     on.next(410, 6),
626                     on.next(520, 7)
627                 });
628                 auto actual = results1.get_observer().messages();
629                 REQUIRE(required == actual);
630             }
631 
632             THEN("result2 contains expected messages"){
633                 auto required = rxu::to_vector({
634                     on.next(410, 6),
635                     on.next(520, 7),
636                     on.completed(630)
637                 });
638                 auto actual = results2.get_observer().messages();
639                 REQUIRE(required == actual);
640             }
641 
642             THEN("result3 contains expected messages"){
643                 auto required = rxu::to_vector({
644                     on.completed(900)
645                 });
646                 auto actual = results3.get_observer().messages();
647                 REQUIRE(required == actual);
648             }
649 
650         }
651     }
652 }
653 
654 
655 SCENARIO("subject - on_error in source", "[subject][subjects]"){
656     GIVEN("a subject and a source with an error"){
657 
658         auto sc = rxsc::make_test();
659         auto w = sc.create_worker();
660         const rxsc::test::messages<int> on;
661 
662         std::runtime_error ex("subject on_error in stream");
663 
664         auto xs = sc.make_hot_observable({
665             on.next(70, 1),
666             on.next(110, 2),
667             on.next(220, 3),
668             on.next(270, 4),
669             on.next(340, 5),
670             on.next(410, 6),
671             on.next(520, 7),
672             on.error(630, ex),
673             on.next(640, 9),
674             on.completed(650),
675             on.error(660, std::runtime_error("error on unsubscribed stream"))
676         });
677 
678         rxsub::subject<int> s;
679 
680         auto results1 = w.make_subscriber<int>();
681 
682         auto results2 = w.make_subscriber<int>();
683 
684         auto results3 = w.make_subscriber<int>();
685 
686         WHEN("multicasting an infinite source"){
687 
688             auto o = s.get_subscriber();
689 
__anon6a6aec1e3002(const rxsc::schedulable&)690             w.schedule_absolute(100, [&s, &o](const rxsc::schedulable&){
691                 s = rxsub::subject<int>(); o = s.get_subscriber();});
__anon6a6aec1e3102(const rxsc::schedulable&)692             w.schedule_absolute(200, [&xs, &o](const rxsc::schedulable&){
693                 xs.subscribe(o);});
__anon6a6aec1e3202(const rxsc::schedulable&)694             w.schedule_absolute(1000, [&o](const rxsc::schedulable&){
695                 o.unsubscribe();});
696 
__anon6a6aec1e3302(const rxsc::schedulable&)697             w.schedule_absolute(300, [&s, &results1](const rxsc::schedulable&){
698                 s.get_observable().subscribe(results1);});
__anon6a6aec1e3402(const rxsc::schedulable&)699             w.schedule_absolute(400, [&s, &results2](const rxsc::schedulable&){
700                 s.get_observable().subscribe(results2);});
__anon6a6aec1e3502(const rxsc::schedulable&)701             w.schedule_absolute(900, [&s, &results3](const rxsc::schedulable&){
702                 s.get_observable().subscribe(results3);});
703 
__anon6a6aec1e3602(const rxsc::schedulable&)704             w.schedule_absolute(600, [&results1](const rxsc::schedulable&){
705                 results1.unsubscribe();});
__anon6a6aec1e3702(const rxsc::schedulable&)706             w.schedule_absolute(700, [&results2](const rxsc::schedulable&){
707                 results2.unsubscribe();});
__anon6a6aec1e3802(const rxsc::schedulable&)708             w.schedule_absolute(800, [&results1](const rxsc::schedulable&){
709                 results1.unsubscribe();});
__anon6a6aec1e3902(const rxsc::schedulable&)710             w.schedule_absolute(950, [&results3](const rxsc::schedulable&){
711                 results3.unsubscribe();});
712 
713             w.start();
714 
715             THEN("result1 contains expected messages"){
716                 auto required = rxu::to_vector({
717                     on.next(340, 5),
718                     on.next(410, 6),
719                     on.next(520, 7)
720                 });
721                 auto actual = results1.get_observer().messages();
722                 REQUIRE(required == actual);
723             }
724 
725             THEN("result2 contains expected messages"){
726                 auto required = rxu::to_vector({
727                     on.next(410, 6),
728                     on.next(520, 7),
729                     on.error(630, ex)
730                 });
731                 auto actual = results2.get_observer().messages();
732                 REQUIRE(required == actual);
733             }
734 
735             THEN("result3 contains expected messages"){
736                 auto required = rxu::to_vector({
737                     on.error(900, ex)
738                 });
739                 auto actual = results3.get_observer().messages();
740                 REQUIRE(required == actual);
741             }
742 
743         }
744     }
745 }
746