1 #include "../test.h" 2 #include <rxcpp/operators/rx-take_while.hpp> 3 4 namespace { 5 class not_equal_to { 6 int value; 7 public: not_equal_to(int value)8 not_equal_to(int value) : value(value) { } operator ()(int i) const9 bool operator()(int i) const { 10 return i != value; 11 } 12 }; 13 } 14 15 SCENARIO("take_while not equal to 4", "[take_while][operators]"){ 16 GIVEN("a source"){ 17 auto sc = rxsc::make_test(); 18 auto w = sc.create_worker(); 19 const rxsc::test::messages<int> on; 20 21 auto xs = sc.make_hot_observable({ 22 on.next(150, 1), 23 on.next(210, 2), 24 on.next(220, 3), 25 on.next(230, 4), 26 on.next(240, 5), 27 on.completed(250) 28 }); 29 30 WHEN("values before 4 are taken"){ 31 32 auto res = w.start( __anon6e0104070202() 33 [xs]() { 34 return xs 35 .take_while(not_equal_to(4)) 36 .as_dynamic(); 37 } 38 ); 39 40 THEN("the output only contains items sent while subscribed"){ 41 auto required = rxu::to_vector({ 42 on.next(210, 2), 43 on.next(220, 3), 44 on.completed(230) 45 }); 46 auto actual = res.get_observer().messages(); 47 REQUIRE(required == actual); 48 } 49 50 THEN("there was 1 subscription/unsubscription to the source"){ 51 auto required = rxu::to_vector({ 52 on.subscribe(200, 230) 53 }); 54 auto actual = xs.subscriptions(); 55 REQUIRE(required == actual); 56 } 57 58 } 59 } 60 } 61 62 SCENARIO("take_while, complete after", "[take_while][operators]"){ 63 GIVEN("a source"){ 64 auto sc = rxsc::make_test(); 65 auto w = sc.create_worker(); 66 const rxsc::test::messages<int> on; 67 68 auto xs = sc.make_hot_observable({ 69 on.next(70, 6), 70 on.next(150, 4), 71 on.next(210, 9), 72 on.next(230, 13), 73 on.next(270, 7), 74 on.next(280, 1), 75 on.next(300, -1), 76 on.next(310, 3), 77 on.next(340, 8), 78 on.next(370, 11), 79 on.next(410, 15), 80 on.next(415, 16), 81 on.next(460, 72), 82 on.next(510, 76), 83 on.next(560, 32), 84 on.next(570, -100), 85 on.next(580, -3), 86 on.next(590, 5), 87 on.next(630, 10), 88 on.completed(690) 89 }); 90 91 WHEN("all are taken"){ 92 93 auto res = w.start( __anon6e0104070302() 94 [xs]() { 95 return xs 96 .take_while(not_equal_to(0)) 97 // forget type to workaround lambda deduction bug on msvc 2013 98 .as_dynamic(); 99 } 100 ); 101 102 THEN("the output only contains items sent while subscribed"){ 103 auto required = rxu::to_vector({ 104 on.next(210, 9), 105 on.next(230, 13), 106 on.next(270, 7), 107 on.next(280, 1), 108 on.next(300, -1), 109 on.next(310, 3), 110 on.next(340, 8), 111 on.next(370, 11), 112 on.next(410, 15), 113 on.next(415, 16), 114 on.next(460, 72), 115 on.next(510, 76), 116 on.next(560, 32), 117 on.next(570, -100), 118 on.next(580, -3), 119 on.next(590, 5), 120 on.next(630, 10), 121 on.completed(690) 122 }); 123 auto actual = res.get_observer().messages(); 124 REQUIRE(required == actual); 125 } 126 127 THEN("there was 1 subscription/unsubscription to the source"){ 128 auto required = rxu::to_vector({ 129 on.subscribe(200, 690) 130 }); 131 auto actual = xs.subscriptions(); 132 REQUIRE(required == actual); 133 } 134 135 } 136 } 137 } 138 139 SCENARIO("take_while, complete before", "[take_while][operators]"){ 140 GIVEN("a source"){ 141 auto sc = rxsc::make_test(); 142 auto w = sc.create_worker(); 143 const rxsc::test::messages<int> on; 144 145 auto xs = sc.make_hot_observable({ 146 on.next(70, 6), 147 on.next(150, 4), 148 on.next(210, 9), 149 on.next(230, 13), 150 on.next(270, 7), 151 on.next(280, 1), 152 on.next(300, -1), 153 on.next(310, 3), 154 on.next(340, 8), 155 on.next(370, 11), 156 on.next(410, 15), 157 on.next(415, 16), 158 on.next(460, 72), 159 on.next(510, 76), 160 on.next(560, 32), 161 on.next(570, -100), 162 on.next(580, -3), 163 on.next(590, 5), 164 on.next(630, 10), 165 on.completed(690) 166 }); 167 168 WHEN("10 values are taken"){ 169 170 auto res = w.start( __anon6e0104070402() 171 [xs]() { 172 return xs 173 .take_while(not_equal_to(72)) 174 // forget type to workaround lambda deduction bug on msvc 2013 175 .as_dynamic(); 176 } 177 ); 178 179 THEN("the output only contains items sent while subscribed"){ 180 auto required = rxu::to_vector({ 181 on.next(210, 9), 182 on.next(230, 13), 183 on.next(270, 7), 184 on.next(280, 1), 185 on.next(300, -1), 186 on.next(310, 3), 187 on.next(340, 8), 188 on.next(370, 11), 189 on.next(410, 15), 190 on.next(415, 16), 191 on.completed(460) 192 }); 193 auto actual = res.get_observer().messages(); 194 REQUIRE(required == actual); 195 } 196 197 THEN("there was 1 subscription/unsubscription to the source"){ 198 auto required = rxu::to_vector({ 199 on.subscribe(200, 460) 200 }); 201 auto actual = xs.subscriptions(); 202 REQUIRE(required == actual); 203 } 204 205 } 206 } 207 } 208 209 SCENARIO("take_while, error after", "[take_while][operators]"){ 210 GIVEN("a source"){ 211 auto sc = rxsc::make_test(); 212 auto w = sc.create_worker(); 213 const rxsc::test::messages<int> on; 214 215 std::runtime_error ex("take_while on_error from source"); 216 217 auto xs = sc.make_hot_observable({ 218 on.next(70, 6), 219 on.next(150, 4), 220 on.next(210, 9), 221 on.next(230, 13), 222 on.next(270, 7), 223 on.next(280, 1), 224 on.next(300, -1), 225 on.next(310, 3), 226 on.next(340, 8), 227 on.next(370, 11), 228 on.next(410, 15), 229 on.next(415, 16), 230 on.next(460, 72), 231 on.next(510, 76), 232 on.next(560, 32), 233 on.next(570, -100), 234 on.next(580, -3), 235 on.next(590, 5), 236 on.next(630, 10), 237 on.error(690, ex) 238 }); 239 240 WHEN("all values are taken"){ 241 242 auto res = w.start( __anon6e0104070502() 243 [xs]() { 244 return xs 245 .take_while(not_equal_to(0)) 246 // forget type to workaround lambda deduction bug on msvc 2013 247 .as_dynamic(); 248 } 249 ); 250 251 THEN("the output only contains items sent while subscribed and the error"){ 252 auto required = rxu::to_vector({ 253 on.next(210, 9), 254 on.next(230, 13), 255 on.next(270, 7), 256 on.next(280, 1), 257 on.next(300, -1), 258 on.next(310, 3), 259 on.next(340, 8), 260 on.next(370, 11), 261 on.next(410, 15), 262 on.next(415, 16), 263 on.next(460, 72), 264 on.next(510, 76), 265 on.next(560, 32), 266 on.next(570, -100), 267 on.next(580, -3), 268 on.next(590, 5), 269 on.next(630, 10), 270 on.error(690, ex) 271 }); 272 auto actual = res.get_observer().messages(); 273 REQUIRE(required == actual); 274 } 275 276 THEN("there was 1 subscription/unsubscription to the source"){ 277 auto required = rxu::to_vector({ 278 on.subscribe(200, 690) 279 }); 280 auto actual = xs.subscriptions(); 281 REQUIRE(required == actual); 282 } 283 284 } 285 } 286 } 287 288 SCENARIO("take_while, error same", "[take_while][operators]"){ 289 GIVEN("a source"){ 290 auto sc = rxsc::make_test(); 291 auto w = sc.create_worker(); 292 const rxsc::test::messages<int> on; 293 294 auto xs = sc.make_hot_observable({ 295 on.next(70, 6), 296 on.next(150, 4), 297 on.next(210, 9), 298 on.next(230, 13), 299 on.next(270, 7), 300 on.next(280, 1), 301 on.next(300, -1), 302 on.next(310, 3), 303 on.next(340, 8), 304 on.next(370, 11), 305 on.next(410, 15), 306 on.next(415, 16), 307 on.next(460, 72), 308 on.next(510, 76), 309 on.next(560, 32), 310 on.next(570, -100), 311 on.next(580, -3), 312 on.next(590, 5), 313 on.next(630, 10), 314 on.error(690, std::runtime_error("error in unsubscribed stream")) 315 }); 316 317 WHEN("all but one values are taken"){ 318 319 auto res = w.start( __anon6e0104070602() 320 [xs]() { 321 return xs 322 .take_while(not_equal_to(10)) 323 // forget type to workaround lambda deduction bug on msvc 2013 324 .as_dynamic(); 325 } 326 ); 327 328 THEN("the output only contains items sent while subscribed"){ 329 auto required = rxu::to_vector({ 330 on.next(210, 9), 331 on.next(230, 13), 332 on.next(270, 7), 333 on.next(280, 1), 334 on.next(300, -1), 335 on.next(310, 3), 336 on.next(340, 8), 337 on.next(370, 11), 338 on.next(410, 15), 339 on.next(415, 16), 340 on.next(460, 72), 341 on.next(510, 76), 342 on.next(560, 32), 343 on.next(570, -100), 344 on.next(580, -3), 345 on.next(590, 5), 346 on.completed(630) 347 }); 348 auto actual = res.get_observer().messages(); 349 REQUIRE(required == actual); 350 } 351 352 THEN("there was 1 subscription/unsubscription to the source"){ 353 auto required = rxu::to_vector({ 354 on.subscribe(200, 630) 355 }); 356 auto actual = xs.subscriptions(); 357 REQUIRE(required == actual); 358 } 359 360 } 361 } 362 } 363 364 SCENARIO("take_while, error before", "[take_while][operators]"){ 365 GIVEN("a source"){ 366 auto sc = rxsc::make_test(); 367 auto w = sc.create_worker(); 368 const rxsc::test::messages<int> on; 369 370 auto xs = sc.make_hot_observable({ 371 on.next(70, 6), 372 on.next(150, 4), 373 on.next(210, 9), 374 on.next(230, 13), 375 on.next(270, 7), 376 on.next(280, 1), 377 on.next(300, -1), 378 on.next(310, 3), 379 on.next(340, 8), 380 on.next(370, 11), 381 on.next(410, 15), 382 on.next(415, 16), 383 on.next(460, 72), 384 on.next(510, 76), 385 on.next(560, 32), 386 on.next(570, -100), 387 on.next(580, -3), 388 on.next(590, 5), 389 on.next(630, 10), 390 on.error(690, std::runtime_error("error in unsubscribed stream")) 391 }); 392 393 WHEN("3 values are taken"){ 394 395 auto res = w.start( __anon6e0104070702() 396 [xs]() { 397 return xs 398 .take_while(not_equal_to(1)) 399 // forget type to workaround lambda deduction bug on msvc 2013 400 .as_dynamic(); 401 } 402 ); 403 404 THEN("the output only contains items sent while subscribed"){ 405 auto required = rxu::to_vector({ 406 on.next(210, 9), 407 on.next(230, 13), 408 on.next(270, 7), 409 on.completed(280) 410 }); 411 auto actual = res.get_observer().messages(); 412 REQUIRE(required == actual); 413 } 414 415 THEN("there was 1 subscription/unsubscription to the source"){ 416 auto required = rxu::to_vector({ 417 on.subscribe(200, 280) 418 }); 419 auto actual = xs.subscriptions(); 420 REQUIRE(required == actual); 421 } 422 423 } 424 } 425 } 426 427 SCENARIO("take_while, dispose before", "[take_while][operators]"){ 428 GIVEN("a source"){ 429 auto sc = rxsc::make_test(); 430 auto w = sc.create_worker(); 431 const rxsc::test::messages<int> on; 432 433 auto xs = sc.make_hot_observable({ 434 on.next(70, 6), 435 on.next(150, 4), 436 on.next(210, 9), 437 on.next(230, 13), 438 on.next(270, 7), 439 on.next(280, 1), 440 on.next(300, -1), 441 on.next(310, 3), 442 on.next(340, 8), 443 on.next(370, 11), 444 on.next(410, 15), 445 on.next(415, 16), 446 on.next(460, 72), 447 on.next(510, 76), 448 on.next(560, 32), 449 on.next(570, -100), 450 on.next(580, -3), 451 on.next(590, 5), 452 on.next(630, 10) 453 }); 454 455 WHEN("3 values are taken"){ 456 457 auto res = w.start( __anon6e0104070802() 458 [xs]() { 459 return xs 460 .take_while(not_equal_to(100)) 461 // forget type to workaround lambda deduction bug on msvc 2013 462 .as_dynamic(); 463 }, 464 250 465 ); 466 467 THEN("the output only contains items sent while subscribed"){ 468 auto required = rxu::to_vector({ 469 on.next(210, 9), 470 on.next(230, 13) 471 }); 472 auto actual = res.get_observer().messages(); 473 REQUIRE(required == actual); 474 } 475 476 THEN("there was 1 subscription/unsubscription to the source"){ 477 auto required = rxu::to_vector({ 478 on.subscribe(200, 250) 479 }); 480 auto actual = xs.subscriptions(); 481 REQUIRE(required == actual); 482 } 483 484 } 485 } 486 } 487 488 SCENARIO("take_while, dispose after", "[take_while][operators]"){ 489 GIVEN("a source"){ 490 auto sc = rxsc::make_test(); 491 auto w = sc.create_worker(); 492 const rxsc::test::messages<int> on; 493 494 auto xs = sc.make_hot_observable({ 495 on.next(70, 6), 496 on.next(150, 4), 497 on.next(210, 9), 498 on.next(230, 13), 499 on.next(270, 7), 500 on.next(280, 1), 501 on.next(300, -1), 502 on.next(310, 3), 503 on.next(340, 8), 504 on.next(370, 11), 505 on.next(410, 15), 506 on.next(415, 16), 507 on.next(460, 72), 508 on.next(510, 76), 509 on.next(560, 32), 510 on.next(570, -100), 511 on.next(580, -3), 512 on.next(590, 5), 513 on.next(630, 10) 514 }); 515 516 WHEN("3 values are taken"){ 517 518 auto res = w.start( __anon6e0104070902() 519 [xs]() { 520 return xs 521 .take_while(not_equal_to(1)) 522 // forget type to workaround lambda deduction bug on msvc 2013 523 .as_dynamic(); 524 }, 525 400 526 ); 527 528 THEN("the output only contains items sent while subscribed"){ 529 auto required = rxu::to_vector({ 530 on.next(210, 9), 531 on.next(230, 13), 532 on.next(270, 7), 533 on.completed(280) 534 }); 535 auto actual = res.get_observer().messages(); 536 REQUIRE(required == actual); 537 } 538 539 THEN("there was 1 subscription/unsubscription to the source"){ 540 auto required = rxu::to_vector({ 541 on.subscribe(200, 280) 542 }); 543 auto actual = xs.subscriptions(); 544 REQUIRE(required == actual); 545 } 546 547 } 548 } 549 } 550 551