1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_RX_SCHEDULER_TEST_HPP)
6 #define RXCPP_RX_SCHEDULER_TEST_HPP
7 
8 #include "../rx-includes.hpp"
9 
10 namespace rxcpp {
11 
12 namespace schedulers {
13 
14 namespace detail {
15 
16 class test_type : public scheduler_interface
17 {
18 public:
19 
20     typedef scheduler_interface::clock_type clock_type;
21 
22     struct test_type_state : public virtual_time<long, long>
23     {
24         typedef virtual_time<long, long> base;
25 
26         using base::schedule_absolute;
27         using base::schedule_relative;
28 
nowrxcpp::schedulers::detail::test_type::test_type_state29         clock_type::time_point now() const {
30             return to_time_point(clock_now);
31         }
32 
schedule_absoluterxcpp::schedulers::detail::test_type::test_type_state33         virtual void schedule_absolute(long when, const schedulable& a) const
34         {
35             if (when <= base::clock_now)
36                 when = base::clock_now + 1;
37 
38             return base::schedule_absolute(when, a);
39         }
40 
addrxcpp::schedulers::detail::test_type::test_type_state41         virtual long add(long absolute, long relative) const
42         {
43             return absolute + relative;
44         }
45 
to_time_pointrxcpp::schedulers::detail::test_type::test_type_state46         virtual clock_type::time_point to_time_point(long absolute) const
47         {
48             return clock_type::time_point(std::chrono::milliseconds(absolute));
49         }
50 
to_relativerxcpp::schedulers::detail::test_type::test_type_state51         virtual long to_relative(clock_type::duration d) const
52         {
53             return static_cast<long>(std::chrono::duration_cast<std::chrono::milliseconds>(d).count());
54         }
55     };
56 
57 private:
58     mutable std::shared_ptr<test_type_state> state;
59 
60 public:
61     struct test_type_worker : public worker_interface
62     {
63         mutable std::shared_ptr<test_type_state> state;
64 
65         typedef test_type_state::absolute absolute;
66         typedef test_type_state::relative relative;
67 
test_type_workerrxcpp::schedulers::detail::test_type::test_type_worker68         test_type_worker(std::shared_ptr<test_type_state> st)
69             : state(std::move(st))
70         {
71         }
72 
nowrxcpp::schedulers::detail::test_type::test_type_worker73         virtual clock_type::time_point now() const {
74             return state->now();
75         }
76 
schedulerxcpp::schedulers::detail::test_type::test_type_worker77         virtual void schedule(const schedulable& scbl) const {
78             state->schedule_absolute(state->clock(), scbl);
79         }
80 
schedulerxcpp::schedulers::detail::test_type::test_type_worker81         virtual void schedule(clock_type::time_point when, const schedulable& scbl) const {
82             state->schedule_relative(state->to_relative(when - now()), scbl);
83         }
84 
schedule_absoluterxcpp::schedulers::detail::test_type::test_type_worker85         void schedule_absolute(absolute when, const schedulable& scbl) const {
86             state->schedule_absolute(when, scbl);
87         }
88 
schedule_relativerxcpp::schedulers::detail::test_type::test_type_worker89         void schedule_relative(relative when, const schedulable& scbl) const {
90             state->schedule_relative(when, scbl);
91         }
92 
is_enabledrxcpp::schedulers::detail::test_type::test_type_worker93         bool is_enabled() const {return state->is_enabled();}
clockrxcpp::schedulers::detail::test_type::test_type_worker94         absolute clock() const {return state->clock();}
95 
startrxcpp::schedulers::detail::test_type::test_type_worker96         void start() const
97         {
98             state->start();
99         }
100 
stoprxcpp::schedulers::detail::test_type::test_type_worker101         void stop() const
102         {
103             state->stop();
104         }
105 
advance_torxcpp::schedulers::detail::test_type::test_type_worker106         void advance_to(absolute time) const
107         {
108             state->advance_to(time);
109         }
110 
advance_byrxcpp::schedulers::detail::test_type::test_type_worker111         void advance_by(relative time) const
112         {
113             state->advance_by(time);
114         }
115 
sleeprxcpp::schedulers::detail::test_type::test_type_worker116         void sleep(relative time) const
117         {
118             state->sleep(time);
119         }
120 
121         template<class T>
122         subscriber<T, rxt::testable_observer<T>> make_subscriber() const;
123     };
124 
125 public:
test_type()126     test_type()
127         : state(std::make_shared<test_type_state>())
128     {
129     }
130 
now() const131     virtual clock_type::time_point now() const {
132         return state->now();
133     }
134 
create_worker(composite_subscription cs) const135     virtual worker create_worker(composite_subscription cs) const {
136         return worker(cs, std::make_shared<test_type_worker>(state));
137     }
138 
is_enabled() const139     bool is_enabled() const {return state->is_enabled();}
clock()140     long clock() {
141         return state->clock();
142     }
143 
to_time_point(long absolute) const144     clock_type::time_point to_time_point(long absolute) const {
145         return state->to_time_point(absolute);
146     }
147 
create_test_type_worker_interface() const148     std::shared_ptr<test_type_worker> create_test_type_worker_interface() const {
149         return std::make_shared<test_type_worker>(state);
150     }
151 
152     template<class T>
153     rxt::testable_observable<T> make_hot_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const;
154 
155     template<class T>
156     rxt::testable_observable<T> make_cold_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const;
157 };
158 
159 template<class T>
160 class mock_observer
161     : public rxt::detail::test_subject_base<T>
162 {
163     typedef typename rxn::notification<T> notification_type;
164     typedef rxn::recorded<typename notification_type::type> recorded_type;
165 
166 public:
mock_observer(std::shared_ptr<test_type::test_type_state> sc)167     explicit mock_observer(std::shared_ptr<test_type::test_type_state> sc)
168         : sc(sc)
169     {
170     }
171 
172     std::shared_ptr<test_type::test_type_state> sc;
173     std::vector<recorded_type> m;
174 
on_subscribe(subscriber<T>) const175     virtual void on_subscribe(subscriber<T>) const {
176         std::terminate();
177     }
subscriptions() const178     virtual std::vector<rxn::subscription> subscriptions() const {
179         std::terminate();
180     }
181 
messages() const182     virtual std::vector<recorded_type> messages() const {
183         return m;
184     }
185 };
186 
187 template<class T>
make_subscriber() const188 subscriber<T, rxt::testable_observer<T>> test_type::test_type_worker::make_subscriber() const
189 {
190     typedef typename rxn::notification<T> notification_type;
191     typedef rxn::recorded<typename notification_type::type> recorded_type;
192 
193     auto ts = std::make_shared<mock_observer<T>>(state);
194 
195     return rxcpp::make_subscriber<T>(rxt::testable_observer<T>(ts, make_observer_dynamic<T>(
196           // on_next
197           [ts](T value)
198           {
199               ts->m.push_back(
200                               recorded_type(ts->sc->clock(), notification_type::on_next(value)));
201           },
202           // on_error
203           [ts](rxu::error_ptr e)
204           {
205               ts->m.push_back(
206                               recorded_type(ts->sc->clock(), notification_type::on_error(e)));
207           },
208           // on_completed
209           [ts]()
210           {
211               ts->m.push_back(
212                               recorded_type(ts->sc->clock(), notification_type::on_completed()));
213           })));
214 }
215 
216 template<class T>
217 class cold_observable
218     : public rxt::detail::test_subject_base<T>
219 {
220     typedef cold_observable<T> this_type;
221     std::shared_ptr<test_type::test_type_state> sc;
222     typedef rxn::recorded<typename rxn::notification<T>::type> recorded_type;
223     mutable std::vector<recorded_type> mv;
224     mutable std::vector<rxn::subscription> sv;
225     mutable worker controller;
226 
227 public:
228 
cold_observable(std::shared_ptr<test_type::test_type_state> sc,worker w,std::vector<recorded_type> mv)229     cold_observable(std::shared_ptr<test_type::test_type_state> sc, worker w, std::vector<recorded_type> mv)
230         : sc(sc)
231         , mv(std::move(mv))
232         , controller(w)
233     {
234     }
235 
236     template<class Iterator>
cold_observable(std::shared_ptr<test_type::test_type_state> sc,worker w,Iterator begin,Iterator end)237     cold_observable(std::shared_ptr<test_type::test_type_state> sc, worker w, Iterator begin, Iterator end)
238         : sc(sc)
239         , mv(begin, end)
240         , controller(w)
241     {
242     }
243 
on_subscribe(subscriber<T> o) const244     virtual void on_subscribe(subscriber<T> o) const {
245         sv.push_back(rxn::subscription(sc->clock()));
246         auto index = sv.size() - 1;
247 
248         for (auto& message : mv) {
249             auto n = message.value();
250             sc->schedule_relative(message.time(), make_schedulable(
251                 controller,
252                 [n, o](const schedulable&) {
253                     if (o.is_subscribed()) {
254                         n->accept(o);
255                     }
256                 }));
257         }
258 
259         auto sharedThis = std::static_pointer_cast<const this_type>(this->shared_from_this());
260         o.add([sharedThis, index]() {
261             sharedThis->sv[index] = rxn::subscription(sharedThis->sv[index].subscribe(), sharedThis->sc->clock());
262         });
263     }
264 
subscriptions() const265     virtual std::vector<rxn::subscription> subscriptions() const {
266         return sv;
267     }
268 
messages() const269     virtual std::vector<recorded_type> messages() const {
270         return mv;
271     }
272 };
273 
274 template<class T>
make_cold_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const275 rxt::testable_observable<T> test_type::make_cold_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const
276 {
277     auto co = std::make_shared<cold_observable<T>>(state, create_worker(composite_subscription()), std::move(messages));
278     return rxt::testable_observable<T>(co);
279 }
280 
281 template<class T>
282 class hot_observable
283     : public rxt::detail::test_subject_base<T>
284 {
285     typedef hot_observable<T> this_type;
286     std::shared_ptr<test_type::test_type_state> sc;
287     typedef rxn::recorded<typename rxn::notification<T>::type> recorded_type;
288     typedef subscriber<T> observer_type;
289     mutable std::vector<recorded_type> mv;
290     mutable std::vector<rxn::subscription> sv;
291     mutable std::list<observer_type> observers;
292     mutable worker controller;
293 
294 public:
295 
hot_observable(std::shared_ptr<test_type::test_type_state> sc,worker w,std::vector<recorded_type> mv)296     hot_observable(std::shared_ptr<test_type::test_type_state> sc, worker w, std::vector<recorded_type> mv)
297         : sc(sc)
298         , mv(mv)
299         , controller(w)
300     {
301         for (auto& message : mv) {
302             auto n = message.value();
303             sc->schedule_absolute(message.time(), make_schedulable(
304                 controller,
305                 [this, n](const schedulable&) {
306                     auto local = this->observers;
307                     for (auto& o : local) {
308                         if (o.is_subscribed()) {
309                             n->accept(o);
310                         }
311                     }
312                 }));
313         }
314     }
315 
~hot_observable()316     virtual ~hot_observable() {}
317 
on_subscribe(observer_type o) const318     virtual void on_subscribe(observer_type o) const {
319         auto olocation = observers.insert(observers.end(), o);
320 
321         sv.push_back(rxn::subscription(sc->clock()));
322         auto index = sv.size() - 1;
323 
324         auto sharedThis = std::static_pointer_cast<const this_type>(this->shared_from_this());
325         o.add([sharedThis, index, olocation]() {
326             sharedThis->sv[index] = rxn::subscription(sharedThis->sv[index].subscribe(), sharedThis->sc->clock());
327             sharedThis->observers.erase(olocation);
328         });
329     }
330 
subscriptions() const331     virtual std::vector<rxn::subscription> subscriptions() const {
332         return sv;
333     }
334 
messages() const335     virtual std::vector<recorded_type> messages() const {
336         return mv;
337     }
338 };
339 
340 template<class T>
make_hot_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const341 rxt::testable_observable<T> test_type::make_hot_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const
342 {
343     auto worker = create_worker(composite_subscription());
344     auto shared = std::make_shared<hot_observable<T>>(state, worker, std::move(messages));
345     return rxt::testable_observable<T>(shared);
346 }
347 
348 template<class F>
349 struct is_create_source_function
350 {
351     struct not_void {};
352     template<class CF>
353     static auto check(int) -> decltype((*(CF*)nullptr)());
354     template<class CF>
355     static not_void check(...);
356 
357     static const bool value = is_observable<decltype(check<rxu::decay_t<F>>(0))>::value;
358 };
359 
360 }
361 
362 class test : public scheduler
363 {
364     std::shared_ptr<detail::test_type> tester;
365 public:
366 
test(std::shared_ptr<detail::test_type> t)367     explicit test(std::shared_ptr<detail::test_type> t)
368         : scheduler(std::static_pointer_cast<scheduler_interface>(t))
369         , tester(t)
370     {
371     }
372 
373     typedef detail::test_type::clock_type clock_type;
374 
375     static const long created_time = 100;
376     static const long subscribed_time = 200;
377     static const long unsubscribed_time = 1000;
378 
379     template<class T>
380     struct messages
381     {
382         typedef typename rxn::notification<T> notification_type;
383         typedef rxn::recorded<typename notification_type::type> recorded_type;
384         typedef rxn::subscription subscription_type;
385 
messagesrxcpp::schedulers::test::messages386         messages() {}
387 
388         template<typename U>
nextrxcpp::schedulers::test::messages389         static recorded_type next(long ticks, U value) {
390             return recorded_type(ticks, notification_type::on_next(std::move(value)));
391         }
392 
completedrxcpp::schedulers::test::messages393         static recorded_type completed(long ticks) {
394             return recorded_type(ticks, notification_type::on_completed());
395         }
396 
397         template<typename Exception>
errorrxcpp::schedulers::test::messages398         static recorded_type error(long ticks, Exception&& e) {
399             return recorded_type(ticks, notification_type::on_error(std::forward<Exception>(e)));
400         }
401 
subscriberxcpp::schedulers::test::messages402         static rxn::subscription subscribe(long subscribe, long unsubscribe) {
403             return rxn::subscription(subscribe, unsubscribe);
404         }
405     };
406 
407     class test_worker : public worker
408     {
409         std::shared_ptr<detail::test_type::test_type_worker> tester;
410     public:
411 
~test_worker()412         ~test_worker() {
413         }
414 
test_worker(composite_subscription cs,std::shared_ptr<detail::test_type::test_type_worker> t)415         explicit test_worker(composite_subscription cs, std::shared_ptr<detail::test_type::test_type_worker> t)
416             : worker(cs, std::static_pointer_cast<worker_interface>(t))
417             , tester(t)
418         {
419         }
420 
is_enabled() const421         bool is_enabled() const {return tester->is_enabled();}
clock() const422         long clock() const {return tester->clock();}
423 
schedule_absolute(long when,const schedulable & a) const424         void schedule_absolute(long when, const schedulable& a) const {
425             tester->schedule_absolute(when, a);
426         }
427 
schedule_relative(long when,const schedulable & a) const428         void schedule_relative(long when, const schedulable& a) const {
429             tester->schedule_relative(when, a);
430         }
431 
432         template<class Arg0, class... ArgN>
schedule_absolute(long when,Arg0 && a0,ArgN &&...an) const433         auto schedule_absolute(long when, Arg0&& a0, ArgN&&... an) const
434             -> typename std::enable_if<
435                 (detail::is_action_function<Arg0>::value ||
436                 is_subscription<Arg0>::value) &&
437                 !is_schedulable<Arg0>::value>::type {
438             tester->schedule_absolute(when, make_schedulable(*this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...));
439         }
440 
441         template<class Arg0, class... ArgN>
schedule_relative(long when,Arg0 && a0,ArgN &&...an) const442         auto schedule_relative(long when, Arg0&& a0, ArgN&&... an) const
443             -> typename std::enable_if<
444                 (detail::is_action_function<Arg0>::value ||
445                 is_subscription<Arg0>::value) &&
446                 !is_schedulable<Arg0>::value>::type {
447             tester->schedule_relative(when, make_schedulable(*this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...));
448         }
449 
advance_to(long time) const450         void advance_to(long time) const
451         {
452             tester->advance_to(time);
453         }
454 
advance_by(long time) const455         void advance_by(long time) const
456         {
457             tester->advance_by(time);
458         }
459 
sleep(long time) const460         void sleep(long time) const
461         {
462             tester->sleep(time);
463         }
464 
465         template<class T, class F>
start(F createSource,long created,long subscribed,long unsubscribed) const466         auto start(F createSource, long created, long subscribed, long unsubscribed) const
467             -> subscriber<T, rxt::testable_observer<T>>
468         {
469             struct state_type
470             : public std::enable_shared_from_this<state_type>
471             {
472                 typedef decltype(createSource()) source_type;
473 
474                 std::unique_ptr<source_type> source;
475                 subscriber<T, rxt::testable_observer<T>> o;
476 
477                 explicit state_type(subscriber<T, rxt::testable_observer<T>> o)
478                 : source()
479                 , o(o)
480                 {
481                 }
482             };
483             auto state = std::make_shared<state_type>(this->make_subscriber<T>());
484 
485             schedule_absolute(created, [createSource, state](const schedulable&) {
486                 state->source.reset(new typename state_type::source_type(createSource()));
487             });
488             schedule_absolute(subscribed, [state](const schedulable&) {
489                 state->source->subscribe(state->o);
490             });
491             schedule_absolute(unsubscribed, [state](const schedulable&) {
492                 state->o.unsubscribe();
493             });
494 
495             tester->start();
496 
497             return state->o;
498         }
499 
500         template<class T, class F>
start(F && createSource,long unsubscribed) const501         auto start(F&& createSource, long unsubscribed) const
502             -> subscriber<T, rxt::testable_observer<T>>
503         {
504             return start<T>(std::forward<F>(createSource), created_time, subscribed_time, unsubscribed);
505         }
506 
507         template<class T, class F>
start(F && createSource) const508         auto start(F&& createSource) const
509             -> subscriber<T, rxt::testable_observer<T>>
510         {
511             return start<T>(std::forward<F>(createSource), created_time, subscribed_time, unsubscribed_time);
512         }
513 
514         template<class F>
515         struct start_traits
516         {
517             typedef decltype((*(F*)nullptr)()) source_type;
518             typedef typename source_type::value_type value_type;
519             typedef subscriber<value_type, rxt::testable_observer<value_type>> subscriber_type;
520         };
521 
522         template<class F>
start(F createSource,long created,long subscribed,long unsubscribed) const523         auto start(F createSource, long created, long subscribed, long unsubscribed) const
524             -> typename std::enable_if<detail::is_create_source_function<F>::value, start_traits<F>>::type::subscriber_type
525         {
526             return start<rxu::value_type_t<start_traits<F>>>(std::move(createSource), created, subscribed, unsubscribed);
527         }
528 
529         template<class F>
start(F createSource,long unsubscribed) const530         auto start(F createSource, long unsubscribed) const
531             -> typename std::enable_if<detail::is_create_source_function<F>::value, start_traits<F>>::type::subscriber_type
532         {
533             return start<rxu::value_type_t<start_traits<F>>>(std::move(createSource), created_time, subscribed_time, unsubscribed);
534         }
535 
536         template<class F>
start(F createSource) const537         auto start(F createSource) const
538             -> typename std::enable_if<detail::is_create_source_function<F>::value, start_traits<F>>::type::subscriber_type
539         {
540             return start<rxu::value_type_t<start_traits<F>>>(std::move(createSource), created_time, subscribed_time, unsubscribed_time);
541         }
542 
start() const543         void start() const {
544             tester->start();
545         }
546 
547         template<class T>
make_subscriber() const548         subscriber<T, rxt::testable_observer<T>> make_subscriber() const {
549             return tester->make_subscriber<T>();
550         }
551     };
552 
now() const553     clock_type::time_point now() const {
554         return tester->now();
555     }
556 
create_worker(composite_subscription cs=composite_subscription ()) const557     test_worker create_worker(composite_subscription cs = composite_subscription()) const {
558         return test_worker(cs, tester->create_test_type_worker_interface());
559     }
560 
is_enabled() const561     bool is_enabled() const {return tester->is_enabled();}
clock() const562     long clock() const {return tester->clock();}
563 
to_time_point(long absolute) const564     clock_type::time_point to_time_point(long absolute) const {
565         return tester->to_time_point(absolute);
566     }
567 
568     template<class T>
make_hot_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const569     rxt::testable_observable<T> make_hot_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const{
570         return tester->make_hot_observable(std::move(messages));
571     }
572 
573     template<class T, std::size_t size>
make_hot_observable(const T (& arr)[size]) const574     auto make_hot_observable(const T (&arr) [size]) const
575         -> decltype(tester->make_hot_observable(std::vector<T>())) {
576         return      tester->make_hot_observable(rxu::to_vector(arr));
577     }
578 
579     template<class T>
make_hot_observable(std::initializer_list<T> il) const580     auto make_hot_observable(std::initializer_list<T> il) const
581         -> decltype(tester->make_hot_observable(std::vector<T>())) {
582         return      tester->make_hot_observable(std::vector<T>(il));
583     }
584 
585     template<class T>
make_cold_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const586     rxt::testable_observable<T> make_cold_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const {
587         return tester->make_cold_observable(std::move(messages));
588     }
589 
590     template<class T, std::size_t size>
make_cold_observable(const T (& arr)[size]) const591     auto make_cold_observable(const T (&arr) [size]) const
592         -> decltype(tester->make_cold_observable(std::vector<T>())) {
593         return      tester->make_cold_observable(rxu::to_vector(arr));
594     }
595 
596     template<class T>
make_cold_observable(std::initializer_list<T> il) const597     auto make_cold_observable(std::initializer_list<T> il) const
598         -> decltype(tester->make_cold_observable(std::vector<T>())) {
599         return      tester->make_cold_observable(std::vector<T>(il));
600     }
601 };
602 
603 
make_test()604 inline test make_test() {
605     return test(std::make_shared<detail::test_type>());
606 }
607 
608 }
609 
identity_test()610 inline identity_one_worker identity_test() {
611     static identity_one_worker r(rxsc::make_test());
612     return r;
613 }
614 
615 }
616 
617 #endif
618