1 #include "../test.h" 2 #include <rxcpp/operators/rx-reduce.hpp> 3 #include <rxcpp/operators/rx-merge.hpp> 4 #include <rxcpp/operators/rx-observe_on.hpp> 5 6 const int static_onnextcalls = 1000000; 7 8 9 SCENARIO("synchronize merge ranges", "[!hide][range][synchronize][merge][perf]"){ 10 const int& onnextcalls = static_onnextcalls; 11 GIVEN("some ranges"){ 12 WHEN("generating ints"){ 13 using namespace std::chrono; 14 typedef steady_clock clock; 15 16 auto so = rx::synchronize_event_loop(); 17 18 int n = 1; 19 auto sectionCount = onnextcalls / 3; 20 auto start = clock::now(); 21 int c = rxs::range(0, sectionCount - 1, 1, so) 22 .merge( 23 so, 24 rxs::range(sectionCount, (sectionCount * 2) - 1, 1, so), 25 rxs::range(sectionCount * 2, onnextcalls - 1, 1, so)) 26 .as_blocking() 27 .count(); 28 29 auto finish = clock::now(); 30 auto msElapsed = duration_cast<milliseconds>(finish.time_since_epoch()) - 31 duration_cast<milliseconds>(start.time_since_epoch()); 32 std::cout << "merge sync ranges : " << n << " subscribed, " << c << " emitted, " << msElapsed.count() << "ms elapsed " << std::endl; 33 } 34 } 35 } 36 37 SCENARIO("observe_on merge ranges", "[!hide][range][observe_on][merge][perf]"){ 38 const int& onnextcalls = static_onnextcalls; 39 GIVEN("some ranges"){ 40 WHEN("generating ints"){ 41 using namespace std::chrono; 42 typedef steady_clock clock; 43 44 auto so = rx::observe_on_event_loop(); 45 46 int n = 1; 47 auto sectionCount = onnextcalls / 3; 48 auto start = clock::now(); 49 int c = rxs::range(0, sectionCount - 1, 1, so) 50 .merge( 51 so, 52 rxs::range(sectionCount, (sectionCount * 2) - 1, 1, so), 53 rxs::range(sectionCount * 2, onnextcalls - 1, 1, so)) 54 .as_blocking() 55 .count(); 56 57 auto finish = clock::now(); 58 auto msElapsed = duration_cast<milliseconds>(finish.time_since_epoch()) - 59 duration_cast<milliseconds>(start.time_since_epoch()); 60 std::cout << "merge observe_on ranges : " << n << " subscribed, " << c << " emitted, " << msElapsed.count() << "ms elapsed " << std::endl; 61 } 62 } 63 } 64 65 SCENARIO("serialize merge ranges", "[!hide][range][serialize][merge][perf]"){ 66 const int& onnextcalls = static_onnextcalls; 67 GIVEN("some ranges"){ 68 WHEN("generating ints"){ 69 using namespace std::chrono; 70 typedef steady_clock clock; 71 72 auto so = rx::serialize_event_loop(); 73 74 int n = 1; 75 auto sectionCount = onnextcalls / 3; 76 auto start = clock::now(); 77 int c = rxs::range(0, sectionCount - 1, 1, so) 78 .merge( 79 so, 80 rxs::range(sectionCount, (sectionCount * 2) - 1, 1, so), 81 rxs::range(sectionCount * 2, onnextcalls - 1, 1, so)) 82 .as_blocking() 83 .count(); 84 85 auto finish = clock::now(); 86 auto msElapsed = duration_cast<milliseconds>(finish.time_since_epoch()) - 87 duration_cast<milliseconds>(start.time_since_epoch()); 88 std::cout << "merge serial ranges : " << n << " subscribed, " << c << " emitted, " << msElapsed.count() << "ms elapsed " << std::endl; 89 } 90 } 91 } 92 93 SCENARIO("merge completes", "[merge][join][operators]"){ 94 GIVEN("1 hot observable with 3 cold observables of ints."){ 95 auto sc = rxsc::make_test(); 96 auto w = sc.create_worker(); 97 const rxsc::test::messages<int> on; 98 const rxsc::test::messages<rx::observable<int>> o_on; 99 100 auto ys1 = sc.make_cold_observable({ 101 on.next(10, 101), 102 on.next(20, 102), 103 on.next(110, 103), 104 on.next(120, 104), 105 on.next(210, 105), 106 on.next(220, 106), 107 on.completed(230) 108 }); 109 110 auto ys2 = sc.make_cold_observable({ 111 on.next(10, 201), 112 on.next(20, 202), 113 on.next(30, 203), 114 on.next(40, 204), 115 on.completed(50) 116 }); 117 118 auto ys3 = sc.make_cold_observable({ 119 on.next(10, 301), 120 on.next(20, 302), 121 on.next(30, 303), 122 on.next(40, 304), 123 on.next(120, 305), 124 on.completed(150) 125 }); 126 127 auto xs = sc.make_hot_observable({ 128 o_on.next(300, ys1), 129 o_on.next(400, ys2), 130 o_on.next(500, ys3), 131 o_on.completed(600) 132 }); 133 134 WHEN("each int is merged"){ 135 136 auto res = w.start( __anon6da9c71a0102() 137 [&]() { 138 return xs 139 | rxo::merge() 140 // forget type to workaround lambda deduction bug on msvc 2013 141 | rxo::as_dynamic(); 142 } 143 ); 144 145 THEN("the output contains merged ints"){ 146 auto required = rxu::to_vector({ 147 on.next(310, 101), 148 on.next(320, 102), 149 on.next(410, 103), 150 on.next(410, 201), 151 on.next(420, 104), 152 on.next(420, 202), 153 on.next(430, 203), 154 on.next(440, 204), 155 on.next(510, 105), 156 on.next(510, 301), 157 on.next(520, 106), 158 on.next(520, 302), 159 on.next(530, 303), 160 on.next(540, 304), 161 on.next(620, 305), 162 on.completed(650) 163 }); 164 auto actual = res.get_observer().messages(); 165 REQUIRE(required == actual); 166 } 167 168 THEN("there was one subscription and one unsubscription to the xs"){ 169 auto required = rxu::to_vector({ 170 on.subscribe(200, 600) 171 }); 172 auto actual = xs.subscriptions(); 173 REQUIRE(required == actual); 174 } 175 176 THEN("there was one subscription and one unsubscription to the ys1"){ 177 auto required = rxu::to_vector({ 178 on.subscribe(300, 530) 179 }); 180 auto actual = ys1.subscriptions(); 181 REQUIRE(required == actual); 182 } 183 184 THEN("there was one subscription and one unsubscription to the ys2"){ 185 auto required = rxu::to_vector({ 186 on.subscribe(400, 450) 187 }); 188 auto actual = ys2.subscriptions(); 189 REQUIRE(required == actual); 190 } 191 192 THEN("there was one subscription and one unsubscription to the ys3"){ 193 auto required = rxu::to_vector({ 194 on.subscribe(500, 650) 195 }); 196 auto actual = ys3.subscriptions(); 197 REQUIRE(required == actual); 198 } 199 } 200 } 201 } 202 203 SCENARIO("variadic merge completes", "[merge][join][operators]"){ 204 GIVEN("1 hot observable with 3 cold observables of ints."){ 205 auto sc = rxsc::make_test(); 206 auto w = sc.create_worker(); 207 const rxsc::test::messages<int> on; 208 const rxsc::test::messages<rx::observable<int>> o_on; 209 210 auto ys1 = sc.make_cold_observable({ 211 on.next(10, 101), 212 on.next(20, 102), 213 on.next(110, 103), 214 on.next(120, 104), 215 on.next(210, 105), 216 on.next(220, 106), 217 on.completed(230) 218 }); 219 220 auto ys2 = sc.make_cold_observable({ 221 on.next(10, 201), 222 on.next(20, 202), 223 on.next(30, 203), 224 on.next(40, 204), 225 on.completed(50) 226 }); 227 228 auto ys3 = sc.make_cold_observable({ 229 on.next(10, 301), 230 on.next(20, 302), 231 on.next(30, 303), 232 on.next(40, 304), 233 on.next(120, 305), 234 on.completed(150) 235 }); 236 237 WHEN("each int is merged"){ 238 239 auto res = w.start( __anon6da9c71a0202() 240 [&]() { 241 return ys1 242 .merge(ys2, ys3); 243 } 244 ); 245 246 THEN("the output contains merged ints"){ 247 auto required = rxu::to_vector({ 248 on.next(210, 101), 249 on.next(210, 201), 250 on.next(210, 301), 251 on.next(220, 102), 252 on.next(220, 202), 253 on.next(220, 302), 254 on.next(230, 203), 255 on.next(230, 303), 256 on.next(240, 204), 257 on.next(240, 304), 258 on.next(310, 103), 259 on.next(320, 104), 260 on.next(320, 305), 261 on.next(410, 105), 262 on.next(420, 106), 263 on.completed(430) 264 }); 265 auto actual = res.get_observer().messages(); 266 REQUIRE(required == actual); 267 } 268 269 THEN("there was one subscription and one unsubscription to the ys1"){ 270 auto required = rxu::to_vector({ 271 on.subscribe(200, 430) 272 }); 273 auto actual = ys1.subscriptions(); 274 REQUIRE(required == actual); 275 } 276 277 THEN("there was one subscription and one unsubscription to the ys2"){ 278 auto required = rxu::to_vector({ 279 on.subscribe(200, 250) 280 }); 281 auto actual = ys2.subscriptions(); 282 REQUIRE(required == actual); 283 } 284 285 THEN("there was one subscription and one unsubscription to the ys3"){ 286 auto required = rxu::to_vector({ 287 on.subscribe(200, 350) 288 }); 289 auto actual = ys3.subscriptions(); 290 REQUIRE(required == actual); 291 } 292 } 293 } 294 } 295