1 
2 #include "rxcpp/rx.hpp"
3 // create alias' to simplify code
4 // these are owned by the user so that
5 // conflicts can be managed by the user.
6 namespace rx=rxcpp;
7 namespace rxsub=rxcpp::subjects;
8 namespace rxu=rxcpp::util;
9 
10 // At this time, RxCpp will fail to compile if the contents
11 // of the std namespace are merged into the global namespace
12 // DO NOT USE: 'using namespace std;'
13 
main()14 int main()
15 {
16     // works
17     {
18         auto published_observable =
19             rx::observable<>::range(1)
20             .filter([](int i)
21             {
22                 std::cout << i << std::endl;
23                 std::this_thread::sleep_for(std::chrono::milliseconds(300));
24                 return true;
25             })
26             .subscribe_on(rx::observe_on_new_thread())
27             .publish();
28 
29         auto subscription = published_observable.connect();
30         std::this_thread::sleep_for(std::chrono::seconds(1));
31         subscription.unsubscribe();
32         std::cout << "unsubscribed" << std::endl << std::endl;
33     }
34 
35     // idiomatic (prefer operators)
36     {
37         auto published_observable =
38             rx::observable<>::interval(std::chrono::milliseconds(300))
39             .subscribe_on(rx::observe_on_new_thread())
40             .publish();
41 
42         published_observable.
43             ref_count().
44             take_until(rx::observable<>::timer(std::chrono::seconds(1))).
45             finally([](){
46                 std::cout << "unsubscribed" << std::endl << std::endl;
47             }).
48             subscribe([](int i){
49                 std::cout << i << std::endl;
50             });
51     }
52 
53     return 0;
54 }
55