1 2 #include "rxcpp/rx.hpp" 3 // create alias' to simplify code 4 // these are owned by the user so that 5 // conflicts can be managed by the user. 6 namespace rx=rxcpp; 7 namespace rxsub=rxcpp::subjects; 8 namespace rxu=rxcpp::util; 9 10 // At this time, RxCpp will fail to compile if the contents 11 // of the std namespace are merged into the global namespace 12 // DO NOT USE: 'using namespace std;' 13 main()14int main() 15 { 16 // works 17 { 18 auto published_observable = 19 rx::observable<>::range(1) 20 .filter([](int i) 21 { 22 std::cout << i << std::endl; 23 std::this_thread::sleep_for(std::chrono::milliseconds(300)); 24 return true; 25 }) 26 .subscribe_on(rx::observe_on_new_thread()) 27 .publish(); 28 29 auto subscription = published_observable.connect(); 30 std::this_thread::sleep_for(std::chrono::seconds(1)); 31 subscription.unsubscribe(); 32 std::cout << "unsubscribed" << std::endl << std::endl; 33 } 34 35 // idiomatic (prefer operators) 36 { 37 auto published_observable = 38 rx::observable<>::interval(std::chrono::milliseconds(300)) 39 .subscribe_on(rx::observe_on_new_thread()) 40 .publish(); 41 42 published_observable. 43 ref_count(). 44 take_until(rx::observable<>::timer(std::chrono::seconds(1))). 45 finally([](){ 46 std::cout << "unsubscribed" << std::endl << std::endl; 47 }). 48 subscribe([](int i){ 49 std::cout << i << std::endl; 50 }); 51 } 52 53 return 0; 54 } 55