1 #include "rxcpp/rx.hpp"
2 
3 #include "rxcpp/rx-test.hpp"
4 #include "catch.hpp"
5 
6 #include <atomic>
7 #include <array>
8 
9 SCENARIO("publish_synchronized sample"){
10     printf("//! [publish_synchronized sample]\n");
11     auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50)).
12         take(5).
13         publish_synchronized(rxcpp::observe_on_new_thread());
14 
15     // Subscribe from the beginning
16     values.subscribe(
__anond2b5055f0102(long v)17         [](long v){printf("[1] OnNext: %ld\n", v);},
__anond2b5055f0202()18         [](){printf("[1] OnCompleted\n");});
19 
20     // Another subscription from the beginning
21     values.subscribe(
__anond2b5055f0302(long v)22         [](long v){printf("[2] OnNext: %ld\n", v);},
__anond2b5055f0402()23         [](){printf("[2] OnCompleted\n");});
24 
25     // Start emitting
26     values.connect();
27 
28     // Wait before subscribing
__anond2b5055f0502(long)29     rxcpp::observable<>::timer(std::chrono::milliseconds(75)).subscribe([&](long){
30         values.subscribe(
31             [](long v){printf("[3] OnNext: %ld\n", v);},
32             [](){printf("[3] OnCompleted\n");});
33     });
34 
35     // Add blocking subscription to see results
36     values.as_blocking().subscribe();
37     printf("//! [publish_synchronized sample]\n");
38 }
39 
40 SCENARIO("publish subject sample"){
41     printf("//! [publish subject sample]\n");
42     auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread()).
43         take(5).
44         publish();
45 
46     // Subscribe from the beginning
47     values.subscribe(
__anond2b5055f0802(long v)48         [](long v){printf("[1] OnNext: %ld\n", v);},
__anond2b5055f0902()49         [](){printf("[1] OnCompleted\n");});
50 
51     // Another subscription from the beginning
52     values.subscribe(
__anond2b5055f0a02(long v)53         [](long v){printf("[2] OnNext: %ld\n", v);},
__anond2b5055f0b02()54         [](){printf("[2] OnCompleted\n");});
55 
56     // Start emitting
57     values.connect();
58 
59     // Wait before subscribing
__anond2b5055f0c02(long)60     rxcpp::observable<>::timer(std::chrono::milliseconds(75)).subscribe([&](long){
61         values.subscribe(
62             [](long v){printf("[3] OnNext: %ld\n", v);},
63             [](){printf("[3] OnCompleted\n");});
64     });
65 
66     // Add blocking subscription to see results
67     values.as_blocking().subscribe();
68     printf("//! [publish subject sample]\n");
69 }
70 
71 SCENARIO("publish behavior sample"){
72     printf("//! [publish behavior sample]\n");
73     auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread()).
74         take(5).
75         publish(0L);
76 
77     // Subscribe from the beginning
78     values.subscribe(
__anond2b5055f0f02(long v)79         [](long v){printf("[1] OnNext: %ld\n", v);},
__anond2b5055f1002()80         [](){printf("[1] OnCompleted\n");});
81 
82     // Another subscription from the beginning
83     values.subscribe(
__anond2b5055f1102(long v)84         [](long v){printf("[2] OnNext: %ld\n", v);},
__anond2b5055f1202()85         [](){printf("[2] OnCompleted\n");});
86 
87     // Start emitting
88     values.connect();
89 
90     // Wait before subscribing
__anond2b5055f1302(long)91     rxcpp::observable<>::timer(std::chrono::milliseconds(75)).subscribe([&](long){
92         values.subscribe(
93             [](long v){printf("[3] OnNext: %ld\n", v);},
94             [](){printf("[3] OnCompleted\n");});
95     });
96 
97     // Add blocking subscription to see results
98     values.as_blocking().subscribe();
99     printf("//! [publish behavior sample]\n");
100 }
101 
102 SCENARIO("publish diamond bgthread sample"){
103     printf("//! [publish diamond bgthread sample]\n");
104 
105     /*
106      * Implements the following diamond graph chain with publish+connect on a background thread.
107      *
108      *            Values
109      *          /      \
110      *        *2        *100
111      *          \      /
112      *            Merge
113      */
114     auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread()).
115         take(5).
116         publish();
117 
118     // Left side multiplies by 2.
119     auto left = values.map(
__anond2b5055f1602(long v)120         [](long v){printf("[1] OnNext: %ld -> %ld\n", v, v*2); return v * 2;} );
121 
122     // Right side multiplies by 100.
123     auto right = values.map(
__anond2b5055f1702(long v)124         [](long v){printf("[2] OnNext: %ld -> %ld\n", v, v*100); return v * 100; });
125 
126     // Merge the left,right sides together.
127     // The items are emitted interleaved ... [left1, right1, left2, right2, left3, right3, ...].
128     auto merged = left.merge(right);
129 
130     std::atomic<bool> completed{false};
131 
132     // Add subscription to see results
133     merged.subscribe(
__anond2b5055f1802(long v) 134         [](long v) { printf("[3] OnNext: %ld\n", v); },
__anond2b5055f1902() 135         [&]() { printf("[3] OnCompleted:\n"); completed = true; });
136 
137     // Start emitting
138     values.connect();
139 
140     // Block until subscription terminates.
141     while (!completed) {}
142 
143     // Note: consider using ref_count(other) in real code, it's more composable.
144 
145     printf("//! [publish diamond bgthread sample]\n");
146 }
147 
148 SCENARIO("publish diamond samethread sample"){
149     printf("//! [publish diamond samethread sample]\n");
150 
151     /*
152      * Implements the following diamond graph chain with publish+connect diamond without using threads.
153      *
154      *            Values
155      *          /      \
156      *        *2        *100
157      *          \      /
158      *            Merge
159      */
160 
161     std::array<int, 5> a={{1, 2, 3, 4, 5}};
162     auto values = rxcpp::observable<>::iterate(a).
163         publish();
164 
165     // Left side multiplies by 2.
166     auto left = values.map(
__anond2b5055f1a02(long v)167         [](long v){printf("[1] OnNext: %ld -> %ld\n", v, v*2); return v * 2;} );
168 
169     // Right side multiplies by 100.
170     auto right = values.map(
__anond2b5055f1b02(long v)171         [](long v){printf("[2] OnNext: %ld -> %ld\n", v, v*100); return v * 100; });
172 
173     // Merge the left,right sides together.
174     // The items are emitted interleaved ... [left1, right1, left2, right2, left3, right3, ...].
175     auto merged = left.merge(right);
176 
177     // Add subscription to see results
178     merged.subscribe(
__anond2b5055f1c02(long v) 179         [](long v) { printf("[3] OnNext: %ld\n", v); },
__anond2b5055f1d02() 180         [&]() { printf("[3] OnCompleted:\n"); });
181 
182     // Start emitting
183     // - because there are no other threads here, the connect call blocks until the source
184     //   calls on_completed.
185     values.connect();
186 
187     // Note: consider using ref_count(other) in real code, it's more composable.
188 
189     printf("//! [publish diamond samethread sample]\n");
190 }
191 
192 // see also examples/doxygen/ref_count.cpp for more diamond examples
193