1 #include "../test.h" 2 #include "rxcpp/operators/rx-with_latest_from.hpp" 3 4 SCENARIO("with_latest_from interleaved with tail", "[with_latest_from][join][operators]"){ 5 GIVEN("2 hot observables of ints."){ 6 auto sc = rxsc::make_test(); 7 auto w = sc.create_worker(); 8 const rxsc::test::messages<int> on; 9 10 auto o1 = sc.make_hot_observable({ 11 on.next(150, 1), 12 on.next(215, 2), 13 on.next(225, 4), 14 on.completed(230) 15 }); 16 17 auto o2 = sc.make_hot_observable({ 18 on.next(150, 1), 19 on.next(220, 3), 20 on.next(230, 5), 21 on.next(235, 6), 22 on.next(240, 7), 23 on.completed(250) 24 }); 25 26 WHEN("each int is combined with the latest from the other source"){ 27 28 auto res = w.start( __anon9ee5cb450102() 29 [&]() { 30 return o2 31 .with_latest_from( 32 [](int v2, int v1){ 33 return v2 + v1; 34 }, 35 o1 36 ) 37 // forget type to workaround lambda deduction bug on msvc 2013 38 .as_dynamic(); 39 } 40 ); 41 42 THEN("the output contains combined ints"){ 43 auto required = rxu::to_vector({ 44 on.next(220, 2 + 3), 45 on.next(230, 4 + 5), 46 on.next(235, 4 + 6), 47 on.next(240, 4 + 7), 48 on.completed(250) 49 }); 50 auto actual = res.get_observer().messages(); 51 REQUIRE(required == actual); 52 } 53 54 THEN("there was one subscription and one unsubscription to the o1"){ 55 auto required = rxu::to_vector({ 56 on.subscribe(200, 230) 57 }); 58 auto actual = o1.subscriptions(); 59 REQUIRE(required == actual); 60 } 61 62 THEN("there was one subscription and one unsubscription to the o2"){ 63 auto required = rxu::to_vector({ 64 on.subscribe(200, 250) 65 }); 66 auto actual = o2.subscriptions(); 67 REQUIRE(required == actual); 68 } 69 } 70 } 71 } 72 73 SCENARIO("with_latest_from consecutive", "[with_latest_from][join][operators]"){ 74 GIVEN("2 hot observables of ints."){ 75 auto sc = rxsc::make_test(); 76 auto w = sc.create_worker(); 77 const rxsc::test::messages<int> on; 78 79 auto o1 = sc.make_hot_observable({ 80 on.next(150, 1), 81 on.next(215, 2), 82 on.next(225, 4), 83 on.completed(230) 84 }); 85 86 auto o2 = sc.make_hot_observable({ 87 on.next(150, 1), 88 on.next(235, 6), 89 on.next(240, 7), 90 on.completed(250) 91 }); 92 93 WHEN("each int is combined with the latest from the other source"){ 94 95 auto res = w.start( __anon9ee5cb450302() 96 [&]() { 97 return o2 98 .with_latest_from( 99 [](int v2, int v1){ 100 return v2 + v1; 101 }, 102 o1 103 ) 104 // forget type to workaround lambda deduction bug on msvc 2013 105 .as_dynamic(); 106 } 107 ); 108 109 THEN("the output contains combined ints"){ 110 auto required = rxu::to_vector({ 111 on.next(235, 4 + 6), 112 on.next(240, 4 + 7), 113 on.completed(250) 114 }); 115 auto actual = res.get_observer().messages(); 116 REQUIRE(required == actual); 117 } 118 119 THEN("there was one subscription and one unsubscription to the o1"){ 120 auto required = rxu::to_vector({ 121 on.subscribe(200, 230) 122 }); 123 auto actual = o1.subscriptions(); 124 REQUIRE(required == actual); 125 } 126 127 THEN("there was one subscription and one unsubscription to the o2"){ 128 auto required = rxu::to_vector({ 129 on.subscribe(200, 250) 130 }); 131 auto actual = o2.subscriptions(); 132 REQUIRE(required == actual); 133 } 134 } 135 } 136 } 137 138 SCENARIO("with_latest_from consecutive ends with error left", "[with_latest_from][join][operators]"){ 139 GIVEN("2 hot observables of ints."){ 140 auto sc = rxsc::make_test(); 141 auto w = sc.create_worker(); 142 const rxsc::test::messages<int> on; 143 144 std::runtime_error ex("with_latest_from on_error from source"); 145 146 auto o1 = sc.make_hot_observable({ 147 on.next(150, 1), 148 on.next(215, 2), 149 on.next(225, 4), 150 on.error(230, ex) 151 }); 152 153 auto o2 = sc.make_hot_observable({ 154 on.next(150, 1), 155 on.next(235, 6), 156 on.next(240, 7), 157 on.completed(250) 158 }); 159 160 WHEN("each int is combined with the latest from the other source"){ 161 162 auto res = w.start( __anon9ee5cb450502() 163 [&]() { 164 return o2 165 .with_latest_from( 166 [](int v2, int v1){ 167 return v2 + v1; 168 }, 169 o1 170 ) 171 // forget type to workaround lambda deduction bug on msvc 2013 172 .as_dynamic(); 173 } 174 ); 175 176 THEN("the output contains only an error"){ 177 auto required = rxu::to_vector({ 178 on.error(230, ex) 179 }); 180 auto actual = res.get_observer().messages(); 181 REQUIRE(required == actual); 182 } 183 184 THEN("there was one subscription and one unsubscription to the o1"){ 185 auto required = rxu::to_vector({ 186 on.subscribe(200, 230) 187 }); 188 auto actual = o1.subscriptions(); 189 REQUIRE(required == actual); 190 } 191 192 THEN("there was one subscription and one unsubscription to the o2"){ 193 auto required = rxu::to_vector({ 194 on.subscribe(200, 230) 195 }); 196 auto actual = o2.subscriptions(); 197 REQUIRE(required == actual); 198 } 199 } 200 } 201 } 202 203 SCENARIO("with_latest_from consecutive ends with error right", "[with_latest_from][join][operators]"){ 204 GIVEN("2 hot observables of ints."){ 205 auto sc = rxsc::make_test(); 206 auto w = sc.create_worker(); 207 const rxsc::test::messages<int> on; 208 209 std::runtime_error ex("with_latest_from on_error from source"); 210 211 auto o1 = sc.make_hot_observable({ 212 on.next(150, 1), 213 on.next(215, 2), 214 on.next(225, 4), 215 on.completed(250) 216 }); 217 218 auto o2 = sc.make_hot_observable({ 219 on.next(150, 1), 220 on.next(235, 6), 221 on.next(240, 7), 222 on.error(245, ex) 223 }); 224 225 WHEN("each int is combined with the latest from the other source"){ 226 227 auto res = w.start( __anon9ee5cb450702() 228 [&]() { 229 return o2 230 .with_latest_from( 231 [](int v2, int v1){ 232 return v2 + v1; 233 }, 234 o1 235 ) 236 // forget type to workaround lambda deduction bug on msvc 2013 237 .as_dynamic(); 238 } 239 ); 240 241 THEN("the output contains combined ints followed by an error"){ 242 auto required = rxu::to_vector({ 243 on.next(235, 4 + 6), 244 on.next(240, 4 + 7), 245 on.error(245, ex) 246 }); 247 auto actual = res.get_observer().messages(); 248 REQUIRE(required == actual); 249 } 250 251 THEN("there was one subscription and one unsubscription to the o1"){ 252 auto required = rxu::to_vector({ 253 on.subscribe(200, 245) 254 }); 255 auto actual = o1.subscriptions(); 256 REQUIRE(required == actual); 257 } 258 259 THEN("there was one subscription and one unsubscription to the o2"){ 260 auto required = rxu::to_vector({ 261 on.subscribe(200, 245) 262 }); 263 auto actual = o2.subscriptions(); 264 REQUIRE(required == actual); 265 } 266 } 267 } 268 } 269 270 SCENARIO("with_latest_from never N", "[with_latest_from][join][operators]"){ 271 GIVEN("N never completed hot observables of ints."){ 272 auto sc = rxsc::make_test(); 273 auto w = sc.create_worker(); 274 const rxsc::test::messages<int> on; 275 276 const int N = 4; 277 278 std::vector<rxcpp::test::testable_observable<int>> n; 279 for (int i = 0; i < N; ++i) { 280 n.push_back( 281 sc.make_hot_observable({ 282 on.next(150, 1) 283 }) 284 ); 285 } 286 287 WHEN("each int is combined with the latest from the other source"){ 288 289 auto res = w.start( __anon9ee5cb450902() 290 [&]() { 291 return n[0] 292 .with_latest_from( 293 [](int v0, int v1, int v2, int v3){ 294 return v0 + v1 + v2 + v3; 295 }, 296 n[1], n[2], n[3] 297 ) 298 // forget type to workaround lambda deduction bug on msvc 2013 299 .as_dynamic(); 300 } 301 ); 302 303 THEN("the output is empty"){ 304 auto required = std::vector<rxsc::test::messages<int>::recorded_type>(); 305 auto actual = res.get_observer().messages(); 306 REQUIRE(required == actual); 307 } 308 309 THEN("there was one subscription and one unsubscription to each observable"){ 310 __anon9ee5cb450b02(rxcpp::test::testable_observable<int> &s)311 std::for_each(n.begin(), n.end(), [&](rxcpp::test::testable_observable<int> &s){ 312 auto required = rxu::to_vector({ 313 on.subscribe(200, 1000) 314 }); 315 auto actual = s.subscriptions(); 316 REQUIRE(required == actual); 317 }); 318 } 319 } 320 } 321 } 322 323 SCENARIO("with_latest_from empty N", "[with_latest_from][join][operators]"){ 324 GIVEN("N empty hot observables of ints."){ 325 auto sc = rxsc::make_test(); 326 auto w = sc.create_worker(); 327 const rxsc::test::messages<int> on; 328 329 const int N = 4; 330 331 std::vector<rxcpp::test::testable_observable<int>> e; 332 for (int i = 0; i < N; ++i) { 333 e.push_back( 334 sc.make_hot_observable({ 335 on.next(150, 1), 336 on.completed(210 + 10 * i) 337 }) 338 ); 339 } 340 341 WHEN("each int is combined with the latest from the other source"){ 342 343 auto res = w.start( __anon9ee5cb450c02() 344 [&]() { 345 return e[0] 346 .with_latest_from( 347 [](int v0, int v1, int v2, int v3){ 348 return v0 + v1 + v2 + v3; 349 }, 350 e[1], e[2], e[3] 351 ) 352 // forget type to workaround lambda deduction bug on msvc 2013 353 .as_dynamic(); 354 } 355 ); 356 357 THEN("the output contains only complete message"){ 358 auto required = rxu::to_vector({ 359 on.completed(200 + 10 * N) 360 }); 361 auto actual = res.get_observer().messages(); 362 REQUIRE(required == actual); 363 } 364 365 THEN("there was one subscription and one unsubscription to each observable"){ 366 367 int i = 0; __anon9ee5cb450e02(rxcpp::test::testable_observable<int> &s)368 std::for_each(e.begin(), e.end(), [&](rxcpp::test::testable_observable<int> &s){ 369 auto required = rxu::to_vector({ 370 on.subscribe(200, 200 + 10 * ++i) 371 }); 372 auto actual = s.subscriptions(); 373 REQUIRE(required == actual); 374 }); 375 } 376 } 377 } 378 } 379 380 SCENARIO("with_latest_from never/empty", "[with_latest_from][join][operators]"){ 381 GIVEN("2 hot observables of ints."){ 382 auto sc = rxsc::make_test(); 383 auto w = sc.create_worker(); 384 const rxsc::test::messages<int> on; 385 386 auto n = sc.make_hot_observable({ 387 on.next(150, 1) 388 }); 389 390 auto e = sc.make_hot_observable({ 391 on.next(150, 1), 392 on.completed(210) 393 }); 394 395 WHEN("each int is combined with the latest from the other source"){ 396 397 auto res = w.start( __anon9ee5cb450f02() 398 [&]() { 399 return n 400 .with_latest_from( 401 [](int v2, int v1){ 402 return v2 + v1; 403 }, 404 e 405 ) 406 // forget type to workaround lambda deduction bug on msvc 2013 407 .as_dynamic(); 408 } 409 ); 410 411 THEN("the output is empty"){ 412 auto required = std::vector<rxsc::test::messages<int>::recorded_type>(); 413 auto actual = res.get_observer().messages(); 414 REQUIRE(required == actual); 415 } 416 417 THEN("there was one subscription and one unsubscription to the n"){ 418 auto required = rxu::to_vector({ 419 on.subscribe(200, 1000) 420 }); 421 auto actual = n.subscriptions(); 422 REQUIRE(required == actual); 423 } 424 425 THEN("there was one subscription and one unsubscription to the e"){ 426 auto required = rxu::to_vector({ 427 on.subscribe(200, 210) 428 }); 429 auto actual = e.subscriptions(); 430 REQUIRE(required == actual); 431 } 432 } 433 } 434 } 435 436 SCENARIO("with_latest_from empty/never", "[with_latest_from][join][operators]"){ 437 GIVEN("2 hot observables of ints."){ 438 auto sc = rxsc::make_test(); 439 auto w = sc.create_worker(); 440 const rxsc::test::messages<int> on; 441 442 auto e = sc.make_hot_observable({ 443 on.next(150, 1), 444 on.completed(210) 445 }); 446 447 auto n = sc.make_hot_observable({ 448 on.next(150, 1) 449 }); 450 451 WHEN("each int is combined with the latest from the other source"){ 452 453 auto res = w.start( __anon9ee5cb451102() 454 [&]() { 455 return e 456 .with_latest_from( 457 [](int v2, int v1){ 458 return v2 + v1; 459 }, 460 n 461 ) 462 // forget type to workaround lambda deduction bug on msvc 2013 463 .as_dynamic(); 464 } 465 ); 466 467 THEN("the output is empty"){ 468 auto required = std::vector<rxsc::test::messages<int>::recorded_type>(); 469 auto actual = res.get_observer().messages(); 470 REQUIRE(required == actual); 471 } 472 473 THEN("there was one subscription and one unsubscription to the e"){ 474 auto required = rxu::to_vector({ 475 on.subscribe(200, 210) 476 }); 477 auto actual = e.subscriptions(); 478 REQUIRE(required == actual); 479 } 480 481 THEN("there was one subscription and one unsubscription to the n"){ 482 auto required = rxu::to_vector({ 483 on.subscribe(200, 1000) 484 }); 485 auto actual = n.subscriptions(); 486 REQUIRE(required == actual); 487 } 488 } 489 } 490 } 491 492 SCENARIO("with_latest_from empty/return", "[with_latest_from][join][operators]"){ 493 GIVEN("2 hot observables of ints."){ 494 auto sc = rxsc::make_test(); 495 auto w = sc.create_worker(); 496 const rxsc::test::messages<int> on; 497 498 auto e = sc.make_hot_observable({ 499 on.next(150, 1), 500 on.completed(210) 501 }); 502 503 auto o = sc.make_hot_observable({ 504 on.next(150, 1), 505 on.next(215, 2), 506 on.completed(220) 507 }); 508 509 WHEN("each int is combined with the latest from the other source"){ 510 511 auto res = w.start( __anon9ee5cb451302() 512 [&]() { 513 return e 514 .with_latest_from( 515 [](int v2, int v1){ 516 return v2 + v1; 517 }, 518 o 519 ) 520 // forget type to workaround lambda deduction bug on msvc 2013 521 .as_dynamic(); 522 } 523 ); 524 525 THEN("the output contains only complete message"){ 526 auto required = rxu::to_vector({ 527 on.completed(220) 528 }); 529 auto actual = res.get_observer().messages(); 530 REQUIRE(required == actual); 531 } 532 533 THEN("there was one subscription and one unsubscription to the e"){ 534 auto required = rxu::to_vector({ 535 on.subscribe(200, 210) 536 }); 537 auto actual = e.subscriptions(); 538 REQUIRE(required == actual); 539 } 540 541 THEN("there was one subscription and one unsubscription to the o"){ 542 auto required = rxu::to_vector({ 543 on.subscribe(200, 220) 544 }); 545 auto actual = o.subscriptions(); 546 REQUIRE(required == actual); 547 } 548 } 549 } 550 } 551 552 SCENARIO("with_latest_from return/empty", "[with_latest_from][join][operators]"){ 553 GIVEN("2 hot observables of ints."){ 554 auto sc = rxsc::make_test(); 555 auto w = sc.create_worker(); 556 const rxsc::test::messages<int> on; 557 558 auto o = sc.make_hot_observable({ 559 on.next(150, 1), 560 on.next(215, 2), 561 on.completed(220) 562 }); 563 564 auto e = sc.make_hot_observable({ 565 on.next(150, 1), 566 on.completed(210) 567 }); 568 569 WHEN("each int is combined with the latest from the other source"){ 570 571 auto res = w.start( __anon9ee5cb451502() 572 [&]() { 573 return o 574 .with_latest_from( 575 [](int v2, int v1){ 576 return v2 + v1; 577 }, 578 e 579 ) 580 // forget type to workaround lambda deduction bug on msvc 2013 581 .as_dynamic(); 582 } 583 ); 584 585 THEN("the output contains only complete message"){ 586 auto required = rxu::to_vector({ 587 on.completed(220) 588 }); 589 auto actual = res.get_observer().messages(); 590 REQUIRE(required == actual); 591 } 592 593 THEN("there was one subscription and one unsubscription to the o"){ 594 auto required = rxu::to_vector({ 595 on.subscribe(200, 220) 596 }); 597 auto actual = o.subscriptions(); 598 REQUIRE(required == actual); 599 } 600 601 THEN("there was one subscription and one unsubscription to the e"){ 602 auto required = rxu::to_vector({ 603 on.subscribe(200, 210) 604 }); 605 auto actual = e.subscriptions(); 606 REQUIRE(required == actual); 607 } 608 } 609 } 610 } 611 612 SCENARIO("with_latest_from never/return", "[with_latest_from][join][operators]"){ 613 GIVEN("2 hot observables of ints."){ 614 auto sc = rxsc::make_test(); 615 auto w = sc.create_worker(); 616 const rxsc::test::messages<int> on; 617 618 auto n = sc.make_hot_observable({ 619 on.next(150, 1) 620 }); 621 622 auto o = sc.make_hot_observable({ 623 on.next(150, 1), 624 on.next(215, 2), 625 on.completed(220) 626 }); 627 628 WHEN("each int is combined with the latest from the other source"){ 629 630 auto res = w.start( __anon9ee5cb451702() 631 [&]() { 632 return n 633 .with_latest_from( 634 [](int v2, int v1){ 635 return v2 + v1; 636 }, 637 o 638 ) 639 // forget type to workaround lambda deduction bug on msvc 2013 640 .as_dynamic(); 641 } 642 ); 643 644 THEN("the output is empty"){ 645 auto required = std::vector<rxsc::test::messages<int>::recorded_type>(); 646 auto actual = res.get_observer().messages(); 647 REQUIRE(required == actual); 648 } 649 650 THEN("there was one subscription and one unsubscription to the n"){ 651 auto required = rxu::to_vector({ 652 on.subscribe(200, 1000) 653 }); 654 auto actual = n.subscriptions(); 655 REQUIRE(required == actual); 656 } 657 658 THEN("there was one subscription and one unsubscription to the o"){ 659 auto required = rxu::to_vector({ 660 on.subscribe(200, 220) 661 }); 662 auto actual = o.subscriptions(); 663 REQUIRE(required == actual); 664 } 665 } 666 } 667 } 668 669 SCENARIO("with_latest_from return/never", "[with_latest_from][join][operators]"){ 670 GIVEN("2 hot observables of ints."){ 671 auto sc = rxsc::make_test(); 672 auto w = sc.create_worker(); 673 const rxsc::test::messages<int> on; 674 675 auto o = sc.make_hot_observable({ 676 on.next(150, 1), 677 on.next(215, 2), 678 on.completed(220) 679 }); 680 681 auto n = sc.make_hot_observable({ 682 on.next(150, 1) 683 }); 684 685 WHEN("each int is combined with the latest from the other source"){ 686 687 auto res = w.start( __anon9ee5cb451902() 688 [&]() { 689 return o 690 .with_latest_from( 691 [](int v2, int v1){ 692 return v2 + v1; 693 }, 694 n 695 ) 696 // forget type to workaround lambda deduction bug on msvc 2013 697 .as_dynamic(); 698 } 699 ); 700 701 THEN("the output is empty"){ 702 auto required = std::vector<rxsc::test::messages<int>::recorded_type>(); 703 auto actual = res.get_observer().messages(); 704 REQUIRE(required == actual); 705 } 706 707 THEN("there was one subscription and one unsubscription to the n"){ 708 auto required = rxu::to_vector({ 709 on.subscribe(200, 1000) 710 }); 711 auto actual = n.subscriptions(); 712 REQUIRE(required == actual); 713 } 714 715 THEN("there was one subscription and one unsubscription to the o"){ 716 auto required = rxu::to_vector({ 717 on.subscribe(200, 220) 718 }); 719 auto actual = o.subscriptions(); 720 REQUIRE(required == actual); 721 } 722 } 723 } 724 } 725 726 727 SCENARIO("with_latest_from return/return", "[with_latest_from][join][operators]"){ 728 GIVEN("2 hot observables of ints."){ 729 auto sc = rxsc::make_test(); 730 auto w = sc.create_worker(); 731 const rxsc::test::messages<int> on; 732 733 auto o1 = sc.make_hot_observable({ 734 on.next(150, 1), 735 on.next(215, 2), 736 on.completed(230) 737 }); 738 739 auto o2 = sc.make_hot_observable({ 740 on.next(150, 1), 741 on.next(220, 3), 742 on.completed(240) 743 }); 744 745 WHEN("each int is combined with the latest from the other source"){ 746 747 auto res = w.start( __anon9ee5cb451b02() 748 [&]() { 749 return o1 750 .with_latest_from( 751 [](int v2, int v1){ 752 return v2 + v1; 753 }, 754 o2 755 ) 756 // forget type to workaround lambda deduction bug on msvc 2013 757 .as_dynamic(); 758 } 759 ); 760 761 THEN("the output contains combined ints"){ 762 auto required = rxu::to_vector({ 763 on.completed(240) 764 }); 765 auto actual = res.get_observer().messages(); 766 REQUIRE(required == actual); 767 } 768 769 THEN("there was one subscription and one unsubscription to the o1"){ 770 auto required = rxu::to_vector({ 771 on.subscribe(200, 230) 772 }); 773 auto actual = o1.subscriptions(); 774 REQUIRE(required == actual); 775 } 776 777 THEN("there was one subscription and one unsubscription to the o2"){ 778 auto required = rxu::to_vector({ 779 on.subscribe(200, 240) 780 }); 781 auto actual = o2.subscriptions(); 782 REQUIRE(required == actual); 783 } 784 } 785 } 786 } 787 788 SCENARIO("with_latest_from empty/error", "[with_latest_from][join][operators]"){ 789 GIVEN("2 hot observables of ints."){ 790 auto sc = rxsc::make_test(); 791 auto w = sc.create_worker(); 792 const rxsc::test::messages<int> on; 793 794 std::runtime_error ex("with_latest_from on_error from source"); 795 796 auto emp = sc.make_hot_observable({ 797 on.next(150, 1), 798 on.completed(230) 799 }); 800 801 auto err = sc.make_hot_observable({ 802 on.next(150, 1), 803 on.error(220, ex) 804 }); 805 806 WHEN("each int is combined with the latest from the other source"){ 807 808 auto res = w.start( __anon9ee5cb451d02() 809 [&]() { 810 return emp 811 .with_latest_from( 812 [](int v2, int v1){ 813 return v2 + v1; 814 }, 815 err 816 ) 817 // forget type to workaround lambda deduction bug on msvc 2013 818 .as_dynamic(); 819 } 820 ); 821 822 THEN("the output contains only error message"){ 823 auto required = rxu::to_vector({ 824 on.error(220, ex) 825 }); 826 auto actual = res.get_observer().messages(); 827 REQUIRE(required == actual); 828 } 829 830 THEN("there was one subscription and one unsubscription to the emp"){ 831 auto required = rxu::to_vector({ 832 on.subscribe(200, 220) 833 }); 834 auto actual = emp.subscriptions(); 835 REQUIRE(required == actual); 836 } 837 838 THEN("there was one subscription and one unsubscription to the err"){ 839 auto required = rxu::to_vector({ 840 on.subscribe(200, 220) 841 }); 842 auto actual = err.subscriptions(); 843 REQUIRE(required == actual); 844 } 845 } 846 } 847 } 848 849 SCENARIO("with_latest_from error/empty", "[with_latest_from][join][operators]"){ 850 GIVEN("2 hot observables of ints."){ 851 auto sc = rxsc::make_test(); 852 auto w = sc.create_worker(); 853 const rxsc::test::messages<int> on; 854 855 std::runtime_error ex("with_latest_from on_error from source"); 856 857 auto err = sc.make_hot_observable({ 858 on.next(150, 1), 859 on.error(220, ex) 860 }); 861 862 auto emp = sc.make_hot_observable({ 863 on.next(150, 1), 864 on.completed(230) 865 }); 866 867 WHEN("each int is combined with the latest from the other source"){ 868 869 auto res = w.start( __anon9ee5cb451f02() 870 [&]() { 871 return err 872 .with_latest_from( 873 [](int v2, int v1){ 874 return v2 + v1; 875 }, 876 emp 877 ) 878 // forget type to workaround lambda deduction bug on msvc 2013 879 .as_dynamic(); 880 } 881 ); 882 883 THEN("the output contains only error message"){ 884 auto required = rxu::to_vector({ 885 on.error(220, ex) 886 }); 887 auto actual = res.get_observer().messages(); 888 REQUIRE(required == actual); 889 } 890 891 THEN("there was one subscription and one unsubscription to the emp"){ 892 auto required = rxu::to_vector({ 893 on.subscribe(200, 220) 894 }); 895 auto actual = emp.subscriptions(); 896 REQUIRE(required == actual); 897 } 898 899 THEN("there was one subscription and one unsubscription to the err"){ 900 auto required = rxu::to_vector({ 901 on.subscribe(200, 220) 902 }); 903 auto actual = err.subscriptions(); 904 REQUIRE(required == actual); 905 } 906 } 907 } 908 } 909 910 SCENARIO("with_latest_from return/error", "[with_latest_from][join][operators]"){ 911 GIVEN("2 hot observables of ints."){ 912 auto sc = rxsc::make_test(); 913 auto w = sc.create_worker(); 914 const rxsc::test::messages<int> on; 915 916 std::runtime_error ex("with_latest_from on_error from source"); 917 918 auto o = sc.make_hot_observable({ 919 on.next(150, 1), 920 on.next(210, 2), 921 on.completed(230) 922 }); 923 924 auto err = sc.make_hot_observable({ 925 on.next(150, 1), 926 on.error(220, ex) 927 }); 928 929 WHEN("each int is combined with the latest from the other source"){ 930 931 auto res = w.start( __anon9ee5cb452102() 932 [&]() { 933 return o 934 .with_latest_from( 935 [](int v2, int v1){ 936 return v2 + v1; 937 }, 938 err 939 ) 940 // forget type to workaround lambda deduction bug on msvc 2013 941 .as_dynamic(); 942 } 943 ); 944 945 THEN("the output contains only error message"){ 946 auto required = rxu::to_vector({ 947 on.error(220, ex) 948 }); 949 auto actual = res.get_observer().messages(); 950 REQUIRE(required == actual); 951 } 952 953 THEN("there was one subscription and one unsubscription to the ret"){ 954 auto required = rxu::to_vector({ 955 on.subscribe(200, 220) 956 }); 957 auto actual = o.subscriptions(); 958 REQUIRE(required == actual); 959 } 960 961 THEN("there was one subscription and one unsubscription to the err"){ 962 auto required = rxu::to_vector({ 963 on.subscribe(200, 220) 964 }); 965 auto actual = err.subscriptions(); 966 REQUIRE(required == actual); 967 } 968 } 969 } 970 } 971 972 SCENARIO("with_latest_from error/return", "[with_latest_from][join][operators]"){ 973 GIVEN("2 hot observables of ints."){ 974 auto sc = rxsc::make_test(); 975 auto w = sc.create_worker(); 976 const rxsc::test::messages<int> on; 977 978 std::runtime_error ex("with_latest_from on_error from source"); 979 980 auto err = sc.make_hot_observable({ 981 on.next(150, 1), 982 on.error(220, ex) 983 }); 984 985 auto ret = sc.make_hot_observable({ 986 on.next(150, 1), 987 on.next(210, 2), 988 on.completed(230) 989 }); 990 991 WHEN("each int is combined with the latest from the other source"){ 992 993 auto res = w.start( __anon9ee5cb452302() 994 [&]() { 995 return err 996 .with_latest_from( 997 [](int v2, int v1){ 998 return v2 + v1; 999 }, 1000 ret 1001 ) 1002 // forget type to workaround lambda deduction bug on msvc 2013 1003 .as_dynamic(); 1004 } 1005 ); 1006 1007 THEN("the output contains only error message"){ 1008 auto required = rxu::to_vector({ 1009 on.error(220, ex) 1010 }); 1011 auto actual = res.get_observer().messages(); 1012 REQUIRE(required == actual); 1013 } 1014 1015 THEN("there was one subscription and one unsubscription to the ret"){ 1016 auto required = rxu::to_vector({ 1017 on.subscribe(200, 220) 1018 }); 1019 auto actual = ret.subscriptions(); 1020 REQUIRE(required == actual); 1021 } 1022 1023 THEN("there was one subscription and one unsubscription to the err"){ 1024 auto required = rxu::to_vector({ 1025 on.subscribe(200, 220) 1026 }); 1027 auto actual = err.subscriptions(); 1028 REQUIRE(required == actual); 1029 } 1030 } 1031 } 1032 } 1033 1034 SCENARIO("with_latest_from error/error", "[with_latest_from][join][operators]"){ 1035 GIVEN("2 hot observables of ints."){ 1036 auto sc = rxsc::make_test(); 1037 auto w = sc.create_worker(); 1038 const rxsc::test::messages<int> on; 1039 1040 std::runtime_error ex1("with_latest_from on_error from source 1"); 1041 std::runtime_error ex2("with_latest_from on_error from source 2"); 1042 1043 auto err1 = sc.make_hot_observable({ 1044 on.next(150, 1), 1045 on.error(220, ex1) 1046 }); 1047 1048 auto err2 = sc.make_hot_observable({ 1049 on.next(150, 1), 1050 on.error(230, ex2) 1051 }); 1052 1053 WHEN("each int is combined with the latest from the other source"){ 1054 1055 auto res = w.start( __anon9ee5cb452502() 1056 [&]() { 1057 return err1 1058 .with_latest_from( 1059 [](int v2, int v1){ 1060 return v2 + v1; 1061 }, 1062 err2 1063 ) 1064 // forget type to workaround lambda deduction bug on msvc 2013 1065 .as_dynamic(); 1066 } 1067 ); 1068 1069 THEN("the output contains only error message"){ 1070 auto required = rxu::to_vector({ 1071 on.error(220, ex1) 1072 }); 1073 auto actual = res.get_observer().messages(); 1074 REQUIRE(required == actual); 1075 } 1076 1077 THEN("there was one subscription and one unsubscription to the err1"){ 1078 auto required = rxu::to_vector({ 1079 on.subscribe(200, 220) 1080 }); 1081 auto actual = err1.subscriptions(); 1082 REQUIRE(required == actual); 1083 } 1084 1085 THEN("there was one subscription and one unsubscription to the err2"){ 1086 auto required = rxu::to_vector({ 1087 on.subscribe(200, 220) 1088 }); 1089 auto actual = err2.subscriptions(); 1090 REQUIRE(required == actual); 1091 } 1092 } 1093 } 1094 } 1095 1096 SCENARIO("with_latest_from next+error/error", "[with_latest_from][join][operators]"){ 1097 GIVEN("2 hot observables of ints."){ 1098 auto sc = rxsc::make_test(); 1099 auto w = sc.create_worker(); 1100 const rxsc::test::messages<int> on; 1101 1102 std::runtime_error ex1("with_latest_from on_error from source 1"); 1103 std::runtime_error ex2("with_latest_from on_error from source 2"); 1104 1105 auto err1 = sc.make_hot_observable({ 1106 on.next(150, 1), 1107 on.next(210, 2), 1108 on.error(220, ex1) 1109 }); 1110 1111 auto err2 = sc.make_hot_observable({ 1112 on.next(150, 1), 1113 on.error(230, ex2) 1114 }); 1115 1116 WHEN("each int is combined with the latest from the other source"){ 1117 1118 auto res = w.start( __anon9ee5cb452702() 1119 [&]() { 1120 return err1 1121 .with_latest_from( 1122 [](int v2, int v1){ 1123 return v2 + v1; 1124 }, 1125 err2 1126 ) 1127 // forget type to workaround lambda deduction bug on msvc 2013 1128 .as_dynamic(); 1129 } 1130 ); 1131 1132 THEN("the output contains only error message"){ 1133 auto required = rxu::to_vector({ 1134 on.error(220, ex1) 1135 }); 1136 auto actual = res.get_observer().messages(); 1137 REQUIRE(required == actual); 1138 } 1139 1140 THEN("there was one subscription and one unsubscription to the err1"){ 1141 auto required = rxu::to_vector({ 1142 on.subscribe(200, 220) 1143 }); 1144 auto actual = err1.subscriptions(); 1145 REQUIRE(required == actual); 1146 } 1147 1148 THEN("there was one subscription and one unsubscription to the err2"){ 1149 auto required = rxu::to_vector({ 1150 on.subscribe(200, 220) 1151 }); 1152 auto actual = err2.subscriptions(); 1153 REQUIRE(required == actual); 1154 } 1155 } 1156 } 1157 } 1158 1159 SCENARIO("with_latest_from error/next+error", "[with_latest_from][join][operators]"){ 1160 GIVEN("2 hot observables of ints."){ 1161 auto sc = rxsc::make_test(); 1162 auto w = sc.create_worker(); 1163 const rxsc::test::messages<int> on; 1164 1165 std::runtime_error ex1("with_latest_from on_error from source 1"); 1166 std::runtime_error ex2("with_latest_from on_error from source 2"); 1167 1168 auto err1 = sc.make_hot_observable({ 1169 on.next(150, 1), 1170 on.error(230, ex1) 1171 }); 1172 1173 auto err2 = sc.make_hot_observable({ 1174 on.next(150, 1), 1175 on.next(210, 2), 1176 on.error(220, ex2) 1177 }); 1178 1179 WHEN("each int is combined with the latest from the other source"){ 1180 1181 auto res = w.start( __anon9ee5cb452902() 1182 [&]() { 1183 return err1 1184 .with_latest_from( 1185 [](int v2, int v1){ 1186 return v2 + v1; 1187 }, 1188 err2 1189 ) 1190 // forget type to workaround lambda deduction bug on msvc 2013 1191 .as_dynamic(); 1192 } 1193 ); 1194 1195 THEN("the output contains only error message"){ 1196 auto required = rxu::to_vector({ 1197 on.error(220, ex2) 1198 }); 1199 auto actual = res.get_observer().messages(); 1200 REQUIRE(required == actual); 1201 } 1202 1203 THEN("there was one subscription and one unsubscription to the err1"){ 1204 auto required = rxu::to_vector({ 1205 on.subscribe(200, 220) 1206 }); 1207 auto actual = err1.subscriptions(); 1208 REQUIRE(required == actual); 1209 } 1210 1211 THEN("there was one subscription and one unsubscription to the err2"){ 1212 auto required = rxu::to_vector({ 1213 on.subscribe(200, 220) 1214 }); 1215 auto actual = err2.subscriptions(); 1216 REQUIRE(required == actual); 1217 } 1218 } 1219 } 1220 } 1221 1222 SCENARIO("with_latest_from never/error", "[with_latest_from][join][operators]"){ 1223 GIVEN("2 hot observables of ints."){ 1224 auto sc = rxsc::make_test(); 1225 auto w = sc.create_worker(); 1226 const rxsc::test::messages<int> on; 1227 1228 std::runtime_error ex("with_latest_from on_error from source"); 1229 1230 auto n = sc.make_hot_observable({ 1231 on.next(150, 1) 1232 }); 1233 1234 auto err = sc.make_hot_observable({ 1235 on.next(150, 1), 1236 on.error(220, ex) 1237 }); 1238 1239 WHEN("each int is combined with the latest from the other source"){ 1240 1241 auto res = w.start( __anon9ee5cb452b02() 1242 [&]() { 1243 return n 1244 .with_latest_from( 1245 [](int v2, int v1){ 1246 return v2 + v1; 1247 }, 1248 err 1249 ) 1250 // forget type to workaround lambda deduction bug on msvc 2013 1251 .as_dynamic(); 1252 } 1253 ); 1254 1255 THEN("the output contains only error message"){ 1256 auto required = rxu::to_vector({ 1257 on.error(220, ex) 1258 }); 1259 auto actual = res.get_observer().messages(); 1260 REQUIRE(required == actual); 1261 } 1262 1263 THEN("there was one subscription and one unsubscription to the n"){ 1264 auto required = rxu::to_vector({ 1265 on.subscribe(200, 220) 1266 }); 1267 auto actual = n.subscriptions(); 1268 REQUIRE(required == actual); 1269 } 1270 1271 THEN("there was one subscription and one unsubscription to the err"){ 1272 auto required = rxu::to_vector({ 1273 on.subscribe(200, 220) 1274 }); 1275 auto actual = err.subscriptions(); 1276 REQUIRE(required == actual); 1277 } 1278 } 1279 } 1280 } 1281 1282 SCENARIO("with_latest_from error/never", "[with_latest_from][join][operators]"){ 1283 GIVEN("2 hot observables of ints."){ 1284 auto sc = rxsc::make_test(); 1285 auto w = sc.create_worker(); 1286 const rxsc::test::messages<int> on; 1287 1288 std::runtime_error ex("with_latest_from on_error from source"); 1289 1290 auto err = sc.make_hot_observable({ 1291 on.next(150, 1), 1292 on.error(220, ex) 1293 }); 1294 1295 auto n = sc.make_hot_observable({ 1296 on.next(150, 1) 1297 }); 1298 1299 WHEN("each int is combined with the latest from the other source"){ 1300 1301 auto res = w.start( __anon9ee5cb452d02() 1302 [&]() { 1303 return err 1304 .with_latest_from( 1305 [](int v2, int v1){ 1306 return v2 + v1; 1307 }, 1308 n 1309 ) 1310 // forget type to workaround lambda deduction bug on msvc 2013 1311 .as_dynamic(); 1312 } 1313 ); 1314 1315 THEN("the output contains only error message"){ 1316 auto required = rxu::to_vector({ 1317 on.error(220, ex) 1318 }); 1319 auto actual = res.get_observer().messages(); 1320 REQUIRE(required == actual); 1321 } 1322 1323 THEN("there was one subscription and one unsubscription to the n"){ 1324 auto required = rxu::to_vector({ 1325 on.subscribe(200, 220) 1326 }); 1327 auto actual = n.subscriptions(); 1328 REQUIRE(required == actual); 1329 } 1330 1331 THEN("there was one subscription and one unsubscription to the err"){ 1332 auto required = rxu::to_vector({ 1333 on.subscribe(200, 220) 1334 }); 1335 auto actual = err.subscriptions(); 1336 REQUIRE(required == actual); 1337 } 1338 } 1339 } 1340 } 1341 1342 SCENARIO("with_latest_from error after completed left", "[with_latest_from][join][operators]"){ 1343 GIVEN("2 hot observables of ints."){ 1344 auto sc = rxsc::make_test(); 1345 auto w = sc.create_worker(); 1346 const rxsc::test::messages<int> on; 1347 1348 std::runtime_error ex("with_latest_from on_error from source"); 1349 1350 auto ret = sc.make_hot_observable({ 1351 on.next(150, 1), 1352 on.next(210, 2), 1353 on.completed(215) 1354 }); 1355 1356 auto err = sc.make_hot_observable({ 1357 on.next(150, 1), 1358 on.error(220, ex) 1359 }); 1360 1361 WHEN("each int is combined with the latest from the other source"){ 1362 1363 auto res = w.start( __anon9ee5cb452f02() 1364 [&]() { 1365 return ret 1366 .with_latest_from( 1367 [](int v2, int v1){ 1368 return v2 + v1; 1369 }, 1370 err 1371 ) 1372 // forget type to workaround lambda deduction bug on msvc 2013 1373 .as_dynamic(); 1374 } 1375 ); 1376 1377 THEN("the output contains only error message"){ 1378 auto required = rxu::to_vector({ 1379 on.error(220, ex) 1380 }); 1381 auto actual = res.get_observer().messages(); 1382 REQUIRE(required == actual); 1383 } 1384 1385 THEN("there was one subscription and one unsubscription to the ret"){ 1386 auto required = rxu::to_vector({ 1387 on.subscribe(200, 215) 1388 }); 1389 auto actual = ret.subscriptions(); 1390 REQUIRE(required == actual); 1391 } 1392 1393 THEN("there was one subscription and one unsubscription to the err"){ 1394 auto required = rxu::to_vector({ 1395 on.subscribe(200, 220) 1396 }); 1397 auto actual = err.subscriptions(); 1398 REQUIRE(required == actual); 1399 } 1400 } 1401 } 1402 } 1403 1404 SCENARIO("with_latest_from error after completed right", "[with_latest_from][join][operators]"){ 1405 GIVEN("2 hot observables of ints."){ 1406 auto sc = rxsc::make_test(); 1407 auto w = sc.create_worker(); 1408 const rxsc::test::messages<int> on; 1409 1410 std::runtime_error ex("with_latest_from on_error from source"); 1411 1412 auto err = sc.make_hot_observable({ 1413 on.next(150, 1), 1414 on.error(220, ex) 1415 }); 1416 1417 auto ret = sc.make_hot_observable({ 1418 on.next(150, 1), 1419 on.next(210, 2), 1420 on.completed(215) 1421 }); 1422 1423 WHEN("each int is combined with the latest from the other source"){ 1424 1425 auto res = w.start( __anon9ee5cb453102() 1426 [&]() { 1427 return err 1428 .with_latest_from( 1429 [](int v2, int v1){ 1430 return v2 + v1; 1431 }, 1432 ret 1433 ) 1434 // forget type to workaround lambda deduction bug on msvc 2013 1435 .as_dynamic(); 1436 } 1437 ); 1438 1439 THEN("the output contains only error message"){ 1440 auto required = rxu::to_vector({ 1441 on.error(220, ex) 1442 }); 1443 auto actual = res.get_observer().messages(); 1444 REQUIRE(required == actual); 1445 } 1446 1447 THEN("there was one subscription and one unsubscription to the ret"){ 1448 auto required = rxu::to_vector({ 1449 on.subscribe(200, 215) 1450 }); 1451 auto actual = ret.subscriptions(); 1452 REQUIRE(required == actual); 1453 } 1454 1455 THEN("there was one subscription and one unsubscription to the err"){ 1456 auto required = rxu::to_vector({ 1457 on.subscribe(200, 220) 1458 }); 1459 auto actual = err.subscriptions(); 1460 REQUIRE(required == actual); 1461 } 1462 } 1463 } 1464 } 1465 1466 SCENARIO("with_latest_from selector throws", "[with_latest_from][join][operators][!throws]"){ 1467 GIVEN("2 hot observables of ints."){ 1468 auto sc = rxsc::make_test(); 1469 auto w = sc.create_worker(); 1470 const rxsc::test::messages<int> on; 1471 1472 std::runtime_error ex("with_latest_from on_error from source"); 1473 1474 auto o1 = sc.make_hot_observable({ 1475 on.next(150, 1), 1476 on.next(215, 2), 1477 on.completed(230) 1478 }); 1479 1480 auto o2 = sc.make_hot_observable({ 1481 on.next(150, 1), 1482 on.next(220, 3), 1483 on.completed(240) 1484 }); 1485 1486 WHEN("each int is combined with the latest from the other source"){ 1487 1488 auto res = w.start( __anon9ee5cb453302() 1489 [&]() { 1490 return o2 1491 .with_latest_from( 1492 [&ex](int, int) -> int { 1493 rxu::throw_exception(ex); 1494 }, 1495 o1 1496 ) 1497 // forget type to workaround lambda deduction bug on msvc 2013 1498 .as_dynamic(); 1499 } 1500 ); 1501 1502 THEN("the output contains only error"){ 1503 auto required = rxu::to_vector({ 1504 on.error(220, ex) 1505 }); 1506 auto actual = res.get_observer().messages(); 1507 REQUIRE(required == actual); 1508 } 1509 1510 THEN("there was one subscription and one unsubscription to the o1"){ 1511 auto required = rxu::to_vector({ 1512 on.subscribe(200, 220) 1513 }); 1514 auto actual = o1.subscriptions(); 1515 REQUIRE(required == actual); 1516 } 1517 1518 THEN("there was one subscription and one unsubscription to the o2"){ 1519 auto required = rxu::to_vector({ 1520 on.subscribe(200, 220) 1521 }); 1522 auto actual = o2.subscriptions(); 1523 REQUIRE(required == actual); 1524 } 1525 } 1526 } 1527 } 1528 1529 SCENARIO("with_latest_from selector throws N", "[with_latest_from][join][operators][!throws]"){ 1530 GIVEN("N hot observables of ints."){ 1531 auto sc = rxsc::make_test(); 1532 auto w = sc.create_worker(); 1533 const rxsc::test::messages<int> on; 1534 1535 const int N = 4; 1536 1537 std::runtime_error ex("with_latest_from on_error from source"); 1538 1539 std::vector<rxcpp::test::testable_observable<int>> e; 1540 for (int i = 0; i < N; ++i) { 1541 e.push_back( 1542 sc.make_hot_observable({ 1543 on.next(210 + 10 * i, 1), 1544 on.completed(500) 1545 }) 1546 ); 1547 } 1548 1549 WHEN("each int is combined with the latest from the other source"){ 1550 1551 auto res = w.start( __anon9ee5cb453502() 1552 [&]() { 1553 return e[3] 1554 .with_latest_from( 1555 [&ex](int, int, int, int) -> int { 1556 rxu::throw_exception(ex); 1557 }, 1558 e[0], e[1], e[2] 1559 ) 1560 // forget type to workaround lambda deduction bug on msvc 2013 1561 .as_dynamic(); 1562 } 1563 ); 1564 1565 THEN("the output contains only error"){ 1566 auto required = rxu::to_vector({ 1567 on.error(200 + 10 * N, ex) 1568 }); 1569 auto actual = res.get_observer().messages(); 1570 REQUIRE(required == actual); 1571 } 1572 1573 THEN("there was one subscription and one unsubscription to each observable"){ 1574 __anon9ee5cb453702(rxcpp::test::testable_observable<int> &s)1575 std::for_each(e.begin(), e.end(), [&](rxcpp::test::testable_observable<int> &s){ 1576 auto required = rxu::to_vector({ 1577 on.subscribe(200, 200 + 10 * N) 1578 }); 1579 auto actual = s.subscriptions(); 1580 REQUIRE(required == actual); 1581 }); 1582 } 1583 } 1584 } 1585 } 1586 1587 SCENARIO("with_latest_from typical N", "[with_latest_from][join][operators]"){ 1588 GIVEN("N hot observables of ints."){ 1589 auto sc = rxsc::make_test(); 1590 auto w = sc.create_worker(); 1591 const rxsc::test::messages<int> on; 1592 1593 const int N = 4; 1594 1595 std::vector<rxcpp::test::testable_observable<int>> o; 1596 for (int i = 0; i < N; ++i) { 1597 o.push_back( 1598 sc.make_hot_observable({ 1599 on.next(150, 1), 1600 on.next(210 + 10 * i, i + 1), 1601 on.next(410 + 10 * i, i + N + 1), 1602 on.completed(800) 1603 }) 1604 ); 1605 } 1606 1607 WHEN("each int is combined with the latest from the other source"){ 1608 1609 auto res = w.start( __anon9ee5cb453802() 1610 [&]() { 1611 return o[3] 1612 .with_latest_from( 1613 [](int v0, int v1, int v2, int v3) { 1614 return v0 + v1 + v2 + v3; 1615 }, 1616 o[0], o[1], o[2] 1617 ) 1618 // forget type to workaround lambda deduction bug on msvc 2013 1619 .as_dynamic(); 1620 } 1621 ); 1622 1623 THEN("the output contains combined ints"){ 1624 auto required = rxu::to_vector({ 1625 on.next(200 + 10 * N, N * (N + 1) / 2), 1626 on.next(410 + 10 * (N - 1), (N - 1) * N / 2 + N + N * N) 1627 }); 1628 required.push_back(on.completed(800)); 1629 auto actual = res.get_observer().messages(); 1630 REQUIRE(required == actual); 1631 } 1632 1633 THEN("there was one subscription and one unsubscription to each observable"){ 1634 __anon9ee5cb453a02(rxcpp::test::testable_observable<int> &s)1635 std::for_each(o.begin(), o.end(), [&](rxcpp::test::testable_observable<int> &s){ 1636 auto required = rxu::to_vector({ 1637 on.subscribe(200, 800) 1638 }); 1639 auto actual = s.subscriptions(); 1640 REQUIRE(required == actual); 1641 }); 1642 } 1643 } 1644 } 1645 } 1646