1 #include "rxcpp/rx.hpp" 2 3 #include "rxcpp/rx-test.hpp" 4 #include "catch.hpp" 5 6 #include <atomic> 7 #include <array> 8 9 SCENARIO("publish_synchronized sample"){ 10 printf("//! [publish_synchronized sample]\n"); 11 auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50)). 12 take(5). 13 publish_synchronized(rxcpp::observe_on_new_thread()); 14 15 // Subscribe from the beginning 16 values.subscribe( __anond2b5055f0102(long v)17 [](long v){printf("[1] OnNext: %ld\n", v);}, __anond2b5055f0202()18 [](){printf("[1] OnCompleted\n");}); 19 20 // Another subscription from the beginning 21 values.subscribe( __anond2b5055f0302(long v)22 [](long v){printf("[2] OnNext: %ld\n", v);}, __anond2b5055f0402()23 [](){printf("[2] OnCompleted\n");}); 24 25 // Start emitting 26 values.connect(); 27 28 // Wait before subscribing __anond2b5055f0502(long)29 rxcpp::observable<>::timer(std::chrono::milliseconds(75)).subscribe([&](long){ 30 values.subscribe( 31 [](long v){printf("[3] OnNext: %ld\n", v);}, 32 [](){printf("[3] OnCompleted\n");}); 33 }); 34 35 // Add blocking subscription to see results 36 values.as_blocking().subscribe(); 37 printf("//! [publish_synchronized sample]\n"); 38 } 39 40 SCENARIO("publish subject sample"){ 41 printf("//! [publish subject sample]\n"); 42 auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread()). 43 take(5). 44 publish(); 45 46 // Subscribe from the beginning 47 values.subscribe( __anond2b5055f0802(long v)48 [](long v){printf("[1] OnNext: %ld\n", v);}, __anond2b5055f0902()49 [](){printf("[1] OnCompleted\n");}); 50 51 // Another subscription from the beginning 52 values.subscribe( __anond2b5055f0a02(long v)53 [](long v){printf("[2] OnNext: %ld\n", v);}, __anond2b5055f0b02()54 [](){printf("[2] OnCompleted\n");}); 55 56 // Start emitting 57 values.connect(); 58 59 // Wait before subscribing __anond2b5055f0c02(long)60 rxcpp::observable<>::timer(std::chrono::milliseconds(75)).subscribe([&](long){ 61 values.subscribe( 62 [](long v){printf("[3] OnNext: %ld\n", v);}, 63 [](){printf("[3] OnCompleted\n");}); 64 }); 65 66 // Add blocking subscription to see results 67 values.as_blocking().subscribe(); 68 printf("//! [publish subject sample]\n"); 69 } 70 71 SCENARIO("publish behavior sample"){ 72 printf("//! [publish behavior sample]\n"); 73 auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread()). 74 take(5). 75 publish(0L); 76 77 // Subscribe from the beginning 78 values.subscribe( __anond2b5055f0f02(long v)79 [](long v){printf("[1] OnNext: %ld\n", v);}, __anond2b5055f1002()80 [](){printf("[1] OnCompleted\n");}); 81 82 // Another subscription from the beginning 83 values.subscribe( __anond2b5055f1102(long v)84 [](long v){printf("[2] OnNext: %ld\n", v);}, __anond2b5055f1202()85 [](){printf("[2] OnCompleted\n");}); 86 87 // Start emitting 88 values.connect(); 89 90 // Wait before subscribing __anond2b5055f1302(long)91 rxcpp::observable<>::timer(std::chrono::milliseconds(75)).subscribe([&](long){ 92 values.subscribe( 93 [](long v){printf("[3] OnNext: %ld\n", v);}, 94 [](){printf("[3] OnCompleted\n");}); 95 }); 96 97 // Add blocking subscription to see results 98 values.as_blocking().subscribe(); 99 printf("//! [publish behavior sample]\n"); 100 } 101 102 SCENARIO("publish diamond bgthread sample"){ 103 printf("//! [publish diamond bgthread sample]\n"); 104 105 /* 106 * Implements the following diamond graph chain with publish+connect on a background thread. 107 * 108 * Values 109 * / \ 110 * *2 *100 111 * \ / 112 * Merge 113 */ 114 auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread()). 115 take(5). 116 publish(); 117 118 // Left side multiplies by 2. 119 auto left = values.map( __anond2b5055f1602(long v)120 [](long v){printf("[1] OnNext: %ld -> %ld\n", v, v*2); return v * 2;} ); 121 122 // Right side multiplies by 100. 123 auto right = values.map( __anond2b5055f1702(long v)124 [](long v){printf("[2] OnNext: %ld -> %ld\n", v, v*100); return v * 100; }); 125 126 // Merge the left,right sides together. 127 // The items are emitted interleaved ... [left1, right1, left2, right2, left3, right3, ...]. 128 auto merged = left.merge(right); 129 130 std::atomic<bool> completed{false}; 131 132 // Add subscription to see results 133 merged.subscribe( __anond2b5055f1802(long v) 134 [](long v) { printf("[3] OnNext: %ld\n", v); }, __anond2b5055f1902() 135 [&]() { printf("[3] OnCompleted:\n"); completed = true; }); 136 137 // Start emitting 138 values.connect(); 139 140 // Block until subscription terminates. 141 while (!completed) {} 142 143 // Note: consider using ref_count(other) in real code, it's more composable. 144 145 printf("//! [publish diamond bgthread sample]\n"); 146 } 147 148 SCENARIO("publish diamond samethread sample"){ 149 printf("//! [publish diamond samethread sample]\n"); 150 151 /* 152 * Implements the following diamond graph chain with publish+connect diamond without using threads. 153 * 154 * Values 155 * / \ 156 * *2 *100 157 * \ / 158 * Merge 159 */ 160 161 std::array<int, 5> a={{1, 2, 3, 4, 5}}; 162 auto values = rxcpp::observable<>::iterate(a). 163 publish(); 164 165 // Left side multiplies by 2. 166 auto left = values.map( __anond2b5055f1a02(long v)167 [](long v){printf("[1] OnNext: %ld -> %ld\n", v, v*2); return v * 2;} ); 168 169 // Right side multiplies by 100. 170 auto right = values.map( __anond2b5055f1b02(long v)171 [](long v){printf("[2] OnNext: %ld -> %ld\n", v, v*100); return v * 100; }); 172 173 // Merge the left,right sides together. 174 // The items are emitted interleaved ... [left1, right1, left2, right2, left3, right3, ...]. 175 auto merged = left.merge(right); 176 177 // Add subscription to see results 178 merged.subscribe( __anond2b5055f1c02(long v) 179 [](long v) { printf("[3] OnNext: %ld\n", v); }, __anond2b5055f1d02() 180 [&]() { printf("[3] OnCompleted:\n"); }); 181 182 // Start emitting 183 // - because there are no other threads here, the connect call blocks until the source 184 // calls on_completed. 185 values.connect(); 186 187 // Note: consider using ref_count(other) in real code, it's more composable. 188 189 printf("//! [publish diamond samethread sample]\n"); 190 } 191 192 // see also examples/doxygen/ref_count.cpp for more diamond examples 193