1 #include "../test.h" 2 #include <rxcpp/operators/rx-map.hpp> 3 #include <rxcpp/operators/rx-take_until.hpp> 4 5 SCENARIO("take_until trigger on_next", "[take_until][take][operators]"){ 6 GIVEN("2 sources"){ 7 auto sc = rxsc::make_test(); 8 auto w = sc.create_worker(); 9 const rxsc::test::messages<int> on; 10 11 auto xs = sc.make_hot_observable({ 12 on.next(150, 1), 13 on.next(210, 2), 14 on.next(220, 3), 15 on.next(230, 4), 16 on.next(240, 5), 17 on.completed(250) 18 }); 19 20 auto ys = sc.make_hot_observable({ 21 on.next(150, 1), 22 on.next(225, 99), 23 on.completed(230) 24 }); 25 26 WHEN("one is taken until the other emits a marble"){ 27 28 auto res = w.start( __anon56d2ed3a0102() 29 [xs, ys]() { 30 return xs 31 | rxo::take_until(ys) 32 // forget type to workaround lambda deduction bug on msvc 2013 33 | rxo::as_dynamic(); 34 } 35 ); 36 37 THEN("the output only contains items sent while subscribed"){ 38 auto required = rxu::to_vector({ 39 on.next(210, 2), 40 on.next(220, 3), 41 on.completed(225) 42 }); 43 auto actual = res.get_observer().messages(); 44 REQUIRE(required == actual); 45 } 46 47 THEN("there was 1 subscription/unsubscription to the source"){ 48 auto required = rxu::to_vector({ 49 on.subscribe(200, 225) 50 }); 51 auto actual = xs.subscriptions(); 52 REQUIRE(required == actual); 53 } 54 55 THEN("there was 1 subscription/unsubscription to the trigger"){ 56 auto required = rxu::to_vector({ 57 on.subscribe(200, 225) 58 }); 59 auto actual = ys.subscriptions(); 60 REQUIRE(required == actual); 61 } 62 63 } 64 } 65 } 66 67 SCENARIO("take_until, preempt some data next", "[take_until][take][operators]"){ 68 GIVEN("2 sources"){ 69 auto sc = rxsc::make_test(); 70 auto w = sc.create_worker(); 71 const rxsc::test::messages<int> on; 72 73 auto l = sc.make_hot_observable({ 74 on.next(150, 1), 75 on.next(210, 2), 76 on.next(220, 3), 77 on.next(230, 4), 78 on.next(240, 5), 79 on.completed(250) 80 }); 81 82 auto r = sc.make_hot_observable({ 83 on.next(150, 1), 84 on.next(225, 99), 85 on.completed(230) 86 }); 87 88 WHEN("one is taken until the other emits a marble"){ 89 90 auto res = w.start( __anon56d2ed3a0202() 91 [l, r]() { 92 return l 93 .take_until(r) 94 // forget type to workaround lambda deduction bug on msvc 2013 95 .as_dynamic(); 96 } 97 ); 98 99 THEN("the output only contains items sent while subscribed"){ 100 auto required = rxu::to_vector({ 101 on.next(210, 2), 102 on.next(220, 3), 103 on.completed(225) 104 }); 105 auto actual = res.get_observer().messages(); 106 REQUIRE(required == actual); 107 } 108 109 THEN("there was 1 subscription/unsubscription to the source"){ 110 auto required = rxu::to_vector({ 111 on.subscribe(200, 225) 112 }); 113 auto actual = l.subscriptions(); 114 REQUIRE(required == actual); 115 } 116 117 THEN("there was 1 subscription/unsubscription to the trigger"){ 118 auto required = rxu::to_vector({ 119 on.subscribe(200, 225) 120 }); 121 auto actual = r.subscriptions(); 122 REQUIRE(required == actual); 123 } 124 125 } 126 } 127 } 128 129 SCENARIO("take_until, preempt some data error", "[take_until][take][operators]"){ 130 GIVEN("2 sources"){ 131 auto sc = rxsc::make_test(); 132 auto w = sc.create_worker(); 133 const rxsc::test::messages<int> on; 134 135 std::runtime_error ex("take_until on_error from source"); 136 137 auto l = sc.make_hot_observable({ 138 on.next(150, 1), 139 on.next(210, 2), 140 on.next(220, 3), 141 on.next(230, 4), 142 on.next(240, 5), 143 on.completed(250) 144 }); 145 146 auto r = sc.make_hot_observable({ 147 on.next(150, 1), 148 on.error(225, ex) 149 }); 150 151 WHEN("one is taken until the other emits a marble"){ 152 153 auto res = w.start( __anon56d2ed3a0302() 154 [l, r]() { 155 return l 156 .take_until(r) 157 // forget type to workaround lambda deduction bug on msvc 2013 158 .as_dynamic(); 159 } 160 ); 161 162 THEN("the output only contains items sent while subscribed"){ 163 auto required = rxu::to_vector({ 164 on.next(210, 2), 165 on.next(220, 3), 166 on.error(225, ex) 167 }); 168 auto actual = res.get_observer().messages(); 169 REQUIRE(required == actual); 170 } 171 172 THEN("there was 1 subscription/unsubscription to the source"){ 173 auto required = rxu::to_vector({ 174 on.subscribe(200, 225) 175 }); 176 auto actual = l.subscriptions(); 177 REQUIRE(required == actual); 178 } 179 180 THEN("there was 1 subscription/unsubscription to the trigger"){ 181 auto required = rxu::to_vector({ 182 on.subscribe(200, 225) 183 }); 184 auto actual = r.subscriptions(); 185 REQUIRE(required == actual); 186 } 187 188 } 189 } 190 } 191 192 SCENARIO("take_until, no-preempt some data empty", "[take_until][take][operators]"){ 193 GIVEN("2 sources"){ 194 auto sc = rxsc::make_test(); 195 auto w = sc.create_worker(); 196 const rxsc::test::messages<int> on; 197 198 auto l = sc.make_hot_observable({ 199 on.next(150, 1), 200 on.next(210, 2), 201 on.next(220, 3), 202 on.next(230, 4), 203 on.next(240, 5), 204 on.completed(250) 205 }); 206 207 auto r = sc.make_hot_observable({ 208 on.next(150, 1), 209 on.completed(225) 210 }); 211 212 WHEN("one is taken until the other emits a marble"){ 213 214 auto res = w.start( __anon56d2ed3a0402() 215 [l, r]() { 216 return l 217 .take_until(r) 218 // forget type to workaround lambda deduction bug on msvc 2013 219 .as_dynamic(); 220 } 221 ); 222 223 THEN("the output only contains items sent while subscribed"){ 224 auto required = rxu::to_vector({ 225 on.next(210, 2), 226 on.next(220, 3), 227 on.next(230, 4), 228 on.next(240, 5), 229 on.completed(250) 230 }); 231 auto actual = res.get_observer().messages(); 232 REQUIRE(required == actual); 233 } 234 235 THEN("there was 1 subscription/unsubscription to the source"){ 236 auto required = rxu::to_vector({ 237 on.subscribe(200, 250) 238 }); 239 auto actual = l.subscriptions(); 240 REQUIRE(required == actual); 241 } 242 243 THEN("there was 1 subscription/unsubscription to the trigger"){ 244 auto required = rxu::to_vector({ 245 on.subscribe(200, 225) 246 }); 247 auto actual = r.subscriptions(); 248 REQUIRE(required == actual); 249 } 250 251 } 252 } 253 } 254 255 SCENARIO("take_until, no-preempt some data never", "[take_until][take][operators]"){ 256 GIVEN("2 sources"){ 257 auto sc = rxsc::make_test(); 258 auto w = sc.create_worker(); 259 const rxsc::test::messages<int> on; 260 261 auto l = sc.make_hot_observable({ 262 on.next(150, 1), 263 on.next(210, 2), 264 on.next(220, 3), 265 on.next(230, 4), 266 on.next(240, 5), 267 on.completed(250) 268 }); 269 270 auto r = sc.make_hot_observable({ 271 on.next(150, 1) 272 }); 273 274 WHEN("one is taken until the other emits a marble"){ 275 276 auto res = w.start( __anon56d2ed3a0502() 277 [l, r]() { 278 return l 279 .take_until(r) 280 // forget type to workaround lambda deduction bug on msvc 2013 281 .as_dynamic(); 282 } 283 ); 284 285 THEN("the output only contains items sent while subscribed"){ 286 auto required = rxu::to_vector({ 287 on.next(210, 2), 288 on.next(220, 3), 289 on.next(230, 4), 290 on.next(240, 5), 291 on.completed(250) 292 }); 293 auto actual = res.get_observer().messages(); 294 REQUIRE(required == actual); 295 } 296 297 THEN("there was 1 subscription/unsubscription to the source"){ 298 auto required = rxu::to_vector({ 299 on.subscribe(200, 250) 300 }); 301 auto actual = l.subscriptions(); 302 REQUIRE(required == actual); 303 } 304 305 THEN("there was 1 subscription/unsubscription to the trigger"){ 306 auto required = rxu::to_vector({ 307 on.subscribe(200, 250) 308 }); 309 auto actual = r.subscriptions(); 310 REQUIRE(required == actual); 311 } 312 313 } 314 } 315 } 316 317 SCENARIO("take_until, preempt never next", "[take_until][take][operators]"){ 318 GIVEN("2 sources"){ 319 auto sc = rxsc::make_test(); 320 auto w = sc.create_worker(); 321 const rxsc::test::messages<int> on; 322 323 auto l = sc.make_hot_observable({ 324 on.next(150, 1) 325 }); 326 327 auto r = sc.make_hot_observable({ 328 on.next(150, 1), 329 on.next(225, 2), //! 330 on.completed(250) 331 }); 332 333 WHEN("one is taken until the other emits a marble"){ 334 335 auto res = w.start( __anon56d2ed3a0602() 336 [l, r]() { 337 return l 338 .take_until(r) 339 // forget type to workaround lambda deduction bug on msvc 2013 340 .as_dynamic(); 341 } 342 ); 343 344 THEN("the output only contains items sent while subscribed"){ 345 auto required = rxu::to_vector({ 346 on.completed(225) 347 }); 348 auto actual = res.get_observer().messages(); 349 REQUIRE(required == actual); 350 } 351 352 THEN("there was 1 subscription/unsubscription to the source"){ 353 auto required = rxu::to_vector({ 354 on.subscribe(200, 225) 355 }); 356 auto actual = l.subscriptions(); 357 REQUIRE(required == actual); 358 } 359 360 THEN("there was 1 subscription/unsubscription to the trigger"){ 361 auto required = rxu::to_vector({ 362 on.subscribe(200, 225) 363 }); 364 auto actual = r.subscriptions(); 365 REQUIRE(required == actual); 366 } 367 368 } 369 } 370 } 371 372 SCENARIO("take_until, preempt never error", "[take_until][take][operators]"){ 373 GIVEN("2 sources"){ 374 auto sc = rxsc::make_test(); 375 auto w = sc.create_worker(); 376 const rxsc::test::messages<int> on; 377 378 std::runtime_error ex("take_until on_error from source"); 379 380 auto l = sc.make_hot_observable({ 381 on.next(150, 1) 382 }); 383 384 auto r = sc.make_hot_observable({ 385 on.next(150, 1), 386 on.error(225, ex) 387 }); 388 389 WHEN("one is taken until the other emits a marble"){ 390 391 auto res = w.start( __anon56d2ed3a0702() 392 [l, r]() { 393 return l 394 .take_until(r) 395 // forget type to workaround lambda deduction bug on msvc 2013 396 .as_dynamic(); 397 } 398 ); 399 400 THEN("the output only contains items sent while subscribed"){ 401 auto required = rxu::to_vector({ 402 on.error(225, ex) 403 }); 404 auto actual = res.get_observer().messages(); 405 REQUIRE(required == actual); 406 } 407 408 THEN("there was 1 subscription/unsubscription to the source"){ 409 auto required = rxu::to_vector({ 410 on.subscribe(200, 225) 411 }); 412 auto actual = l.subscriptions(); 413 REQUIRE(required == actual); 414 } 415 416 THEN("there was 1 subscription/unsubscription to the trigger"){ 417 auto required = rxu::to_vector({ 418 on.subscribe(200, 225) 419 }); 420 auto actual = r.subscriptions(); 421 REQUIRE(required == actual); 422 } 423 424 } 425 } 426 } 427 428 SCENARIO("take_until, no-preempt never empty", "[take_until][take][operators]"){ 429 GIVEN("2 sources"){ 430 auto sc = rxsc::make_test(); 431 auto w = sc.create_worker(); 432 const rxsc::test::messages<int> on; 433 434 auto l = sc.make_hot_observable({ 435 on.next(150, 1) 436 }); 437 438 auto r = sc.make_hot_observable({ 439 on.next(150, 1), 440 on.completed(225) 441 }); 442 443 WHEN("one is taken until the other emits a marble"){ 444 445 auto res = w.start( __anon56d2ed3a0802() 446 [l, r]() { 447 return l 448 .take_until(r) 449 // forget type to workaround lambda deduction bug on msvc 2013 450 .as_dynamic(); 451 } 452 ); 453 454 THEN("the output only contains items sent while subscribed"){ 455 auto required = std::vector<rxsc::test::messages<int>::recorded_type>(); 456 auto actual = res.get_observer().messages(); 457 REQUIRE(required == actual); 458 } 459 460 THEN("there was 1 subscription/unsubscription to the source"){ 461 auto required = rxu::to_vector({ 462 on.subscribe(200, 1000 /* can't dispose prematurely, could be in flight to dispatch OnError */) 463 }); 464 auto actual = l.subscriptions(); 465 REQUIRE(required == actual); 466 } 467 468 THEN("there was 1 subscription/unsubscription to the trigger"){ 469 auto required = rxu::to_vector({ 470 on.subscribe(200, 225) 471 }); 472 auto actual = r.subscriptions(); 473 REQUIRE(required == actual); 474 } 475 476 } 477 } 478 } 479 480 SCENARIO("take_until, no-preempt never never", "[take_until][take][operators]"){ 481 GIVEN("2 sources"){ 482 auto sc = rxsc::make_test(); 483 auto w = sc.create_worker(); 484 const rxsc::test::messages<int> on; 485 486 auto l = sc.make_hot_observable({ 487 on.next(150, 1) 488 }); 489 490 auto r = sc.make_hot_observable({ 491 on.next(150, 1) 492 }); 493 494 WHEN("one is taken until the other emits a marble"){ 495 496 auto res = w.start( __anon56d2ed3a0902() 497 [l, r]() { 498 return l 499 .take_until(r) 500 // forget type to workaround lambda deduction bug on msvc 2013 501 .as_dynamic(); 502 } 503 ); 504 505 THEN("the output only contains items sent while subscribed"){ 506 auto required = std::vector<rxsc::test::messages<int>::recorded_type>(); 507 auto actual = res.get_observer().messages(); 508 REQUIRE(required == actual); 509 } 510 511 THEN("there was 1 subscription/unsubscription to the source"){ 512 auto required = rxu::to_vector({ 513 on.subscribe(200, 1000) 514 }); 515 auto actual = l.subscriptions(); 516 REQUIRE(required == actual); 517 } 518 519 THEN("there was 1 subscription/unsubscription to the trigger"){ 520 auto required = rxu::to_vector({ 521 on.subscribe(200, 1000) 522 }); 523 auto actual = r.subscriptions(); 524 REQUIRE(required == actual); 525 } 526 527 } 528 } 529 } 530 531 SCENARIO("take_until, preempt before first produced", "[take_until][take][operators]"){ 532 GIVEN("2 sources"){ 533 auto sc = rxsc::make_test(); 534 auto w = sc.create_worker(); 535 const rxsc::test::messages<int> on; 536 537 auto l = sc.make_hot_observable({ 538 on.next(150, 1), 539 on.next(230, 2), 540 on.completed(240) 541 }); 542 543 auto r = sc.make_hot_observable({ 544 on.next(150, 1), 545 on.next(210, 2), //! 546 on.completed(220) 547 }); 548 549 WHEN("one is taken until the other emits a marble"){ 550 551 auto res = w.start( __anon56d2ed3a0a02() 552 [l, r]() { 553 return l 554 .take_until(r) 555 // forget type to workaround lambda deduction bug on msvc 2013 556 .as_dynamic(); 557 } 558 ); 559 560 THEN("the output only contains items sent while subscribed"){ 561 auto required = rxu::to_vector({ 562 on.completed(210) 563 }); 564 auto actual = res.get_observer().messages(); 565 REQUIRE(required == actual); 566 } 567 568 THEN("there was 1 subscription/unsubscription to the source"){ 569 auto required = rxu::to_vector({ 570 on.subscribe(200, 210) 571 }); 572 auto actual = l.subscriptions(); 573 REQUIRE(required == actual); 574 } 575 576 THEN("there was 1 subscription/unsubscription to the trigger"){ 577 auto required = rxu::to_vector({ 578 on.subscribe(200, 210) 579 }); 580 auto actual = r.subscriptions(); 581 REQUIRE(required == actual); 582 } 583 584 } 585 } 586 } 587 588 SCENARIO("take_until, preempt before first produced, remain silent and proper unsubscribed", "[take_until][take][operators]"){ 589 GIVEN("2 sources"){ 590 auto sc = rxsc::make_test(); 591 auto w = sc.create_worker(); 592 const rxsc::test::messages<int> on; 593 594 bool sourceNotDisposed = false; 595 596 auto l = sc.make_hot_observable({ 597 on.next(150, 1), 598 on.error(215, std::runtime_error("error in unsubscribed stream")), // should not come 599 on.completed(240) 600 }); 601 602 auto r = sc.make_hot_observable({ 603 on.next(150, 1), 604 on.next(210, 2), //! 605 on.completed(220) 606 }); 607 608 WHEN("one is taken until the other emits a marble"){ 609 610 auto res = w.start( __anon56d2ed3a0b02() 611 [l, r, &sourceNotDisposed]() { 612 return l 613 .map([&sourceNotDisposed](int v){sourceNotDisposed = true; return v; }) 614 .take_until(r) 615 // forget type to workaround lambda deduction bug on msvc 2013 616 .as_dynamic(); 617 } 618 ); 619 620 THEN("the output only contains items sent while subscribed"){ 621 auto required = rxu::to_vector({ 622 on.completed(210) 623 }); 624 auto actual = res.get_observer().messages(); 625 REQUIRE(required == actual); 626 } 627 628 THEN("signal disposed"){ 629 auto required = false; 630 auto actual = sourceNotDisposed; 631 REQUIRE(required == actual); 632 } 633 634 } 635 } 636 } 637 638 SCENARIO("take_until, no-preempt after last produced, proper unsubscribe signal", "[take_until][take][operators]"){ 639 GIVEN("2 sources"){ 640 auto sc = rxsc::make_test(); 641 auto w = sc.create_worker(); 642 const rxsc::test::messages<int> on; 643 644 bool signalNotDisposed = false; 645 646 auto l = sc.make_hot_observable({ 647 on.next(150, 1), 648 on.next(230, 2), 649 on.completed(240) 650 }); 651 652 auto r = sc.make_hot_observable({ 653 on.next(150, 1), 654 on.next(250, 2), 655 on.completed(260) 656 }); 657 658 WHEN("one is taken until the other emits a marble"){ 659 660 auto res = w.start( __anon56d2ed3a0d02() 661 [l, r, &signalNotDisposed]() { 662 return l 663 .take_until(r 664 .map([&signalNotDisposed](int v){signalNotDisposed = true; return v; })) 665 // forget type to workaround lambda deduction bug on msvc 2013 666 .as_dynamic(); 667 } 668 ); 669 670 THEN("the output only contains items sent while subscribed"){ 671 auto required = rxu::to_vector({ 672 on.next(230, 2), 673 on.completed(240) 674 }); 675 auto actual = res.get_observer().messages(); 676 REQUIRE(required == actual); 677 } 678 679 THEN("signal disposed"){ 680 auto required = false; 681 auto actual = signalNotDisposed; 682 REQUIRE(required == actual); 683 } 684 685 } 686 } 687 } 688 689 SCENARIO("take_until, error some", "[take_until][take][operators]"){ 690 GIVEN("2 sources"){ 691 auto sc = rxsc::make_test(); 692 auto w = sc.create_worker(); 693 const rxsc::test::messages<int> on; 694 695 std::runtime_error ex("take_until on_error from source"); 696 697 auto l = sc.make_hot_observable({ 698 on.next(150, 1), 699 on.error(225, ex) 700 }); 701 702 auto r = sc.make_hot_observable({ 703 on.next(150, 1), 704 on.next(240, 2) 705 }); 706 707 WHEN("one is taken until the other emits a marble"){ 708 709 auto res = w.start( __anon56d2ed3a0f02() 710 [l, r]() { 711 return l 712 .take_until(r) 713 // forget type to workaround lambda deduction bug on msvc 2013 714 .as_dynamic(); 715 } 716 ); 717 718 THEN("the output only contains items sent while subscribed"){ 719 auto required = rxu::to_vector({ 720 on.error(225, ex) 721 }); 722 auto actual = res.get_observer().messages(); 723 REQUIRE(required == actual); 724 } 725 726 THEN("there was 1 subscription/unsubscription to the source"){ 727 auto required = rxu::to_vector({ 728 on.subscribe(200, 225) 729 }); 730 auto actual = l.subscriptions(); 731 REQUIRE(required == actual); 732 } 733 734 THEN("there was 1 subscription/unsubscription to the trigger"){ 735 auto required = rxu::to_vector({ 736 on.subscribe(200, 225) 737 }); 738 auto actual = r.subscriptions(); 739 REQUIRE(required == actual); 740 } 741 742 } 743 } 744 } 745 746 SCENARIO("take_until trigger on time point", "[take_until][take][operators]"){ 747 GIVEN("a source and a time point"){ 748 auto sc = rxsc::make_test(); 749 auto so = rx::synchronize_in_one_worker(sc); 750 auto w = sc.create_worker(); 751 const rxsc::test::messages<int> on; 752 753 auto xs = sc.make_hot_observable({ 754 on.next(150, 1), 755 on.next(210, 2), 756 on.next(220, 3), 757 on.next(230, 4), 758 on.next(240, 5), 759 on.completed(250) 760 }); 761 762 auto t = sc.to_time_point(225); 763 764 WHEN("invoked with a time point"){ 765 766 auto res = w.start( __anon56d2ed3a1002() 767 [&]() { 768 return xs 769 | rxo::take_until(t, so) 770 // forget type to workaround lambda deduction bug on msvc 2013 771 | rxo::as_dynamic(); 772 } 773 ); 774 775 THEN("the output only contains items sent while subscribed"){ 776 auto required = rxu::to_vector({ 777 on.next(211, 2), 778 on.next(221, 3), 779 on.completed(226) 780 }); 781 auto actual = res.get_observer().messages(); 782 REQUIRE(required == actual); 783 } 784 785 THEN("there was 1 subscription/unsubscription to the source"){ 786 auto required = rxu::to_vector({ 787 on.subscribe(200, 226) 788 }); 789 auto actual = xs.subscriptions(); 790 REQUIRE(required == actual); 791 } 792 } 793 } 794 } 795