1 #include "rxcpp/rx.hpp"
2 
3 #include "rxcpp/rx-test.hpp"
4 #include "catch.hpp"
5 
6 SCENARIO("take_until sample"){
7     printf("//! [take_until sample]\n");
8     auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(10)).take(7);
9     auto trigger = rxcpp::observable<>::timer(std::chrono::milliseconds(25));
10     auto values = source.take_until(trigger);
11     values.
12         subscribe(
__anon063b5e980102(long v)13             [](long v){printf("OnNext: %ld\n", v);},
__anon063b5e980202()14             [](){printf("OnCompleted\n");});
15     printf("//! [take_until sample]\n");
16 }
17 
18 SCENARIO("take_until time sample"){
19     printf("//! [take_until time sample]\n");
20     auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(10)).take(7);
21     auto values = source.take_until(std::chrono::steady_clock::now() + std::chrono::milliseconds(25));
22     values.
23         subscribe(
__anon063b5e980302(long v)24             [](long v){printf("OnNext: %ld\n", v);},
__anon063b5e980402()25             [](){printf("OnCompleted\n");});
26     printf("//! [take_until time sample]\n");
27 }
28 
29 #include "main.hpp"
30 
31 SCENARIO("threaded take_until sample"){
32     printf("//! [threaded take_until sample]\n");
33     printf("[thread %s] Start task\n", get_pid().c_str());
__anon063b5e980502(long v)34     auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(10)).take(7).map([](long v){
35         printf("[thread %s] Source emits, value = %ld\n", get_pid().c_str(), v);
36         return v;
37     });
__anon063b5e980602(long v)38     auto trigger = rxcpp::observable<>::timer(std::chrono::milliseconds(25)).map([](long v){
39         printf("[thread %s] Trigger emits, value = %ld\n", get_pid().c_str(), v);
40         return v;
41     });
42     auto values = source.take_until(trigger, rxcpp::observe_on_new_thread());
43     values.
44         as_blocking().
45         subscribe(
__anon063b5e980702(long v)46             [](long v){printf("[thread %s] OnNext: %ld\n", get_pid().c_str(), v);},
__anon063b5e980802()47             [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
48     printf("[thread %s] Finish task\n", get_pid().c_str());
49     printf("//! [threaded take_until sample]\n");
50 }
51 
52 SCENARIO("threaded take_until time sample"){
53     printf("//! [threaded take_until time sample]\n");
54     printf("[thread %s] Start task\n", get_pid().c_str());
__anon063b5e980902(long v)55     auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(10)).take(7).map([](long v){
56         printf("[thread %s] Source emits, value = %ld\n", get_pid().c_str(), v);
57         return v;
58     });
59     auto scheduler = rxcpp::observe_on_new_thread();
60     auto values = source.take_until(scheduler.now() + std::chrono::milliseconds(25), scheduler);
61     values.
62         as_blocking().
63         subscribe(
__anon063b5e980a02(long v)64             [](long v){printf("[thread %s] OnNext: %ld\n", get_pid().c_str(), v);},
__anon063b5e980b02()65             [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
66     printf("[thread %s] Finish task\n", get_pid().c_str());
67     printf("//! [threaded take_until time sample]\n");
68 }
69