1 #include "rxcpp/rx.hpp" 2 3 #include "rxcpp/rx-test.hpp" 4 #include "catch.hpp" 5 6 SCENARIO("range sample"){ 7 printf("//! [range sample]\n"); 8 auto values1 = rxcpp::observable<>::range(1, 5); 9 values1. 10 subscribe( __anon0b2e54350102(int v)11 [](int v){printf("OnNext: %d\n", v);}, __anon0b2e54350202()12 [](){printf("OnCompleted\n");}); 13 printf("//! [range sample]\n"); 14 } 15 16 #include "main.hpp" 17 18 SCENARIO("threaded range sample"){ 19 printf("//! [threaded range sample]\n"); 20 printf("[thread %s] Start task\n", get_pid().c_str()); 21 auto values = rxcpp::observable<>::range(1, 3, rxcpp::observe_on_new_thread()); 22 auto s = values. __anon0b2e54350302(int v) 23 map([](int v) { return std::make_tuple(get_pid(), v);}); 24 s. 25 as_blocking(). 26 subscribe( 27 rxcpp::util::apply_to( __anon0b2e54350402(const std::string pid, int v) 28 [](const std::string pid, int v) { 29 printf("[thread %s] OnNext: %d\n", pid.c_str(), v); 30 }), __anon0b2e54350502()31 [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());}); 32 printf("[thread %s] Finish task\n", get_pid().c_str()); 33 printf("//! [threaded range sample]\n"); 34 } 35 36 SCENARIO("subscribe_on range sample"){ 37 printf("//! [subscribe_on range sample]\n"); 38 printf("[thread %s] Start task\n", get_pid().c_str()); 39 auto values = rxcpp::observable<>::range(1, 3); 40 auto s = values. 41 subscribe_on(rxcpp::observe_on_new_thread()). __anon0b2e54350602(int v) 42 map([](int v) { return std::make_tuple(get_pid(), v);}); 43 s. 44 as_blocking(). 45 subscribe( 46 rxcpp::util::apply_to( __anon0b2e54350702(const std::string pid, int v) 47 [](const std::string pid, int v) { 48 printf("[thread %s] OnNext: %d\n", pid.c_str(), v); 49 }), __anon0b2e54350802()50 [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());}); 51 printf("[thread %s] Finish task\n", get_pid().c_str()); 52 printf("//! [subscribe_on range sample]\n"); 53 } 54 55 56 SCENARIO("range concat sample"){ 57 printf("//! [range concat sample]\n"); 58 59 auto values = rxcpp::observable<>::range(1); // infinite (until overflow) stream of integers 60 61 auto s1 = values. 62 take(3). __anon0b2e54350902(int v) 63 map([](int v) { return std::make_tuple("1:", v);}); 64 65 auto s2 = values. 66 take(3). __anon0b2e54350a02(int v) 67 map([](int v) { return std::make_tuple("2:", v);}); 68 69 s1. 70 concat(s2). 71 subscribe(rxcpp::util::apply_to( __anon0b2e54350b02(const char* s, int p) 72 [](const char* s, int p) { 73 printf("%s %d\n", s, p); 74 })); 75 printf("//! [range concat sample]\n"); 76 } 77 78 SCENARIO("range merge sample"){ 79 printf("//! [range merge sample]\n"); 80 81 auto values = rxcpp::observable<>::range(1); // infinite (until overflow) stream of integers 82 83 auto s1 = values. __anon0b2e54350c02(int v) 84 map([](int v) { return std::make_tuple("1:", v);}); 85 86 auto s2 = values. __anon0b2e54350d02(int v) 87 map([](int v) { return std::make_tuple("2:", v);}); 88 89 s1. 90 merge(s2). 91 take(6). 92 as_blocking(). 93 subscribe(rxcpp::util::apply_to( __anon0b2e54350e02(const char* s, int p) 94 [](const char* s, int p) { 95 printf("%s %d\n", s, p); 96 })); 97 printf("//! [range merge sample]\n"); 98 } 99 100 SCENARIO("threaded range concat sample"){ 101 printf("//! [threaded range concat sample]\n"); 102 auto threads = rxcpp::observe_on_event_loop(); 103 104 auto values = rxcpp::observable<>::range(1); // infinite (until overflow) stream of integers 105 106 auto s1 = values. 107 subscribe_on(threads). 108 take(3). __anon0b2e54350f02(int v) 109 map([](int v) { std::this_thread::yield(); return std::make_tuple("1:", v);}); 110 111 auto s2 = values. 112 subscribe_on(threads). 113 take(3). __anon0b2e54351002(int v) 114 map([](int v) { std::this_thread::yield(); return std::make_tuple("2:", v);}); 115 116 s1. 117 concat(s2). 118 observe_on(threads). 119 as_blocking(). 120 subscribe(rxcpp::util::apply_to( __anon0b2e54351102(const char* s, int p) 121 [](const char* s, int p) { 122 printf("%s %d\n", s, p); 123 })); 124 printf("//! [threaded range concat sample]\n"); 125 } 126 127 SCENARIO("threaded range merge sample"){ 128 printf("//! [threaded range merge sample]\n"); 129 auto threads = rxcpp::observe_on_event_loop(); 130 131 auto values = rxcpp::observable<>::range(1); // infinite (until overflow) stream of integers 132 133 auto s1 = values. 134 subscribe_on(threads). __anon0b2e54351202(int v) 135 map([](int v) { std::this_thread::yield(); return std::make_tuple("1:", v);}); 136 137 auto s2 = values. 138 subscribe_on(threads). __anon0b2e54351302(int v) 139 map([](int v) { std::this_thread::yield(); return std::make_tuple("2:", v);}); 140 141 s1. 142 merge(s2). 143 take(6). 144 observe_on(threads). 145 as_blocking(). 146 subscribe(rxcpp::util::apply_to( __anon0b2e54351402(const char* s, int p) 147 [](const char* s, int p) { 148 printf("%s %d\n", s, p); 149 })); 150 printf("//! [threaded range merge sample]\n"); 151 } 152 153