1 #include "rxcpp/rx.hpp" 2 3 #include "rxcpp/rx-test.hpp" 4 #include "catch.hpp" 5 6 SCENARIO("zip sample"){ 7 printf("//! [zip sample]\n"); 8 auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(1)); 9 auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(2)); 10 auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(3)); 11 auto values = o1.zip(o2, o3); 12 values. 13 take(3). 14 subscribe( __anonc943069b0102(std::tuple<int, int, int> v)15 [](std::tuple<int, int, int> v){printf("OnNext: %d, %d, %d\n", std::get<0>(v), std::get<1>(v), std::get<2>(v));}, __anonc943069b0202()16 [](){printf("OnCompleted\n");}); 17 printf("//! [zip sample]\n"); 18 } 19 20 #include "main.hpp" 21 22 SCENARIO("Coordination zip sample"){ 23 printf("//! [Coordination zip sample]\n"); 24 printf("[thread %s] Start task\n", get_pid().c_str()); 25 auto thr = rxcpp::synchronize_event_loop(); __anonc943069b0302(int v) 26 auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(1)).map([](int v) { 27 printf("[thread %s] Source1 OnNext: %d\n", get_pid().c_str(), v); 28 return v; 29 }); __anonc943069b0402(int v) 30 auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(2)).map([](int v) { 31 printf("[thread %s] Source2 OnNext: %d\n", get_pid().c_str(), v); 32 return v; 33 }); __anonc943069b0502(int v) 34 auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(3)).map([](int v) { 35 printf("[thread %s] Source3 OnNext: %d\n", get_pid().c_str(), v); 36 return v; 37 }); 38 auto values = o1.zip(thr, o2, o3); 39 values. 40 take(3). 41 as_blocking(). 42 subscribe( __anonc943069b0602(std::tuple<int, int, int> v)43 [](std::tuple<int, int, int> v){printf("[thread %s] OnNext: %d, %d, %d\n", get_pid().c_str(), std::get<0>(v), std::get<1>(v), std::get<2>(v));}, __anonc943069b0702()44 [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());}); 45 printf("[thread %s] Finish task\n", get_pid().c_str()); 46 printf("//! [Coordination zip sample]\n"); 47 } 48 49 SCENARIO("Selector zip sample"){ 50 printf("//! [Selector zip sample]\n"); 51 auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(1)); 52 auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(2)); 53 auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(3)); 54 auto values = o1 | rxcpp::operators::zip( __anonc943069b0802(int v1, int v2, int v3) 55 [](int v1, int v2, int v3) { 56 return 100 * v1 + 10 * v2 + v3; 57 }, 58 o2, o3); 59 values. 60 take(3). 61 subscribe( __anonc943069b0902(int v)62 [](int v){printf("OnNext: %d\n", v);}, __anonc943069b0a02()63 [](){printf("OnCompleted\n");}); 64 printf("//! [Selector zip sample]\n"); 65 } 66 67 SCENARIO("Coordination+Selector zip sample"){ 68 printf("//! [Coordination+Selector zip sample]\n"); 69 auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(1)); 70 auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(2)); 71 auto o3 = rxcpp::observable<>::interval(std::chrono::milliseconds(3)); 72 auto values = o1.zip( 73 rxcpp::observe_on_new_thread(), __anonc943069b0b02(int v1, int v2, int v3) 74 [](int v1, int v2, int v3) { 75 return 100 * v1 + 10 * v2 + v3; 76 }, 77 o2, o3); 78 values. 79 take(3). 80 as_blocking(). 81 subscribe( __anonc943069b0c02(int v)82 [](int v){printf("OnNext: %d\n", v);}, __anonc943069b0d02()83 [](){printf("OnCompleted\n");}); 84 printf("//! [Coordination+Selector zip sample]\n"); 85 } 86