1 #include "rxcpp/rx.hpp" 2 3 #include "rxcpp/rx-test.hpp" 4 #include "catch.hpp" 5 6 SCENARIO("take_until sample"){ 7 printf("//! [take_until sample]\n"); 8 auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(10)).take(7); 9 auto trigger = rxcpp::observable<>::timer(std::chrono::milliseconds(25)); 10 auto values = source.take_until(trigger); 11 values. 12 subscribe( __anon063b5e980102(long v)13 [](long v){printf("OnNext: %ld\n", v);}, __anon063b5e980202()14 [](){printf("OnCompleted\n");}); 15 printf("//! [take_until sample]\n"); 16 } 17 18 SCENARIO("take_until time sample"){ 19 printf("//! [take_until time sample]\n"); 20 auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(10)).take(7); 21 auto values = source.take_until(std::chrono::steady_clock::now() + std::chrono::milliseconds(25)); 22 values. 23 subscribe( __anon063b5e980302(long v)24 [](long v){printf("OnNext: %ld\n", v);}, __anon063b5e980402()25 [](){printf("OnCompleted\n");}); 26 printf("//! [take_until time sample]\n"); 27 } 28 29 #include "main.hpp" 30 31 SCENARIO("threaded take_until sample"){ 32 printf("//! [threaded take_until sample]\n"); 33 printf("[thread %s] Start task\n", get_pid().c_str()); __anon063b5e980502(long v)34 auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(10)).take(7).map([](long v){ 35 printf("[thread %s] Source emits, value = %ld\n", get_pid().c_str(), v); 36 return v; 37 }); __anon063b5e980602(long v)38 auto trigger = rxcpp::observable<>::timer(std::chrono::milliseconds(25)).map([](long v){ 39 printf("[thread %s] Trigger emits, value = %ld\n", get_pid().c_str(), v); 40 return v; 41 }); 42 auto values = source.take_until(trigger, rxcpp::observe_on_new_thread()); 43 values. 44 as_blocking(). 45 subscribe( __anon063b5e980702(long v)46 [](long v){printf("[thread %s] OnNext: %ld\n", get_pid().c_str(), v);}, __anon063b5e980802()47 [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());}); 48 printf("[thread %s] Finish task\n", get_pid().c_str()); 49 printf("//! [threaded take_until sample]\n"); 50 } 51 52 SCENARIO("threaded take_until time sample"){ 53 printf("//! [threaded take_until time sample]\n"); 54 printf("[thread %s] Start task\n", get_pid().c_str()); __anon063b5e980902(long v)55 auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(10)).take(7).map([](long v){ 56 printf("[thread %s] Source emits, value = %ld\n", get_pid().c_str(), v); 57 return v; 58 }); 59 auto scheduler = rxcpp::observe_on_new_thread(); 60 auto values = source.take_until(scheduler.now() + std::chrono::milliseconds(25), scheduler); 61 values. 62 as_blocking(). 63 subscribe( __anon063b5e980a02(long v)64 [](long v){printf("[thread %s] OnNext: %ld\n", get_pid().c_str(), v);}, __anon063b5e980b02()65 [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());}); 66 printf("[thread %s] Finish task\n", get_pid().c_str()); 67 printf("//! [threaded take_until time sample]\n"); 68 } 69