1 #include "../test.h" 2 #include "rxcpp/operators/rx-sample_time.hpp" 3 4 SCENARIO("sample with time, error", "[sample_with_time][operators]"){ 5 GIVEN("1 hot observable of ints."){ 6 auto sc = rxsc::make_test(); 7 auto so = rx::synchronize_in_one_worker(sc); 8 auto w = sc.create_worker(); 9 const rxsc::test::messages<int> on; 10 11 std::runtime_error ex("sample_with_time on_error from source"); 12 13 auto xs = sc.make_hot_observable({ 14 on.next(100, 1), 15 on.next(210, 2), 16 on.next(240, 3), 17 on.next(280, 4), 18 on.next(320, 5), 19 on.next(350, 6), 20 on.next(380, 7), 21 on.next(420, 8), 22 on.next(470, 9), 23 on.error(600, ex) 24 }); 25 WHEN("group ints on intersecting intervals"){ 26 using namespace std::chrono; 27 28 auto res = w.start( __anon94cd60ac0102() 29 [&]() { 30 return xs 31 | rxo::sample_with_time(milliseconds(100), so) 32 | rxo::as_dynamic(); 33 } 34 ); 35 36 THEN("the output contains groups of ints"){ 37 auto required = rxu::to_vector({ 38 on.next(301, 4), 39 on.next(401, 7), 40 on.next(501, 9), 41 on.error(601, ex) 42 }); 43 auto actual = res.get_observer().messages(); 44 REQUIRE(required == actual); 45 } 46 47 THEN("there was one subscription and one unsubscription to the xs"){ 48 auto required = rxu::to_vector({ 49 on.subscribe(200, 600) 50 }); 51 auto actual = xs.subscriptions(); 52 REQUIRE(required == actual); 53 } 54 } 55 } 56 } 57 58 SCENARIO("sample with time, disposed", "[sample_with_time][operators]"){ 59 GIVEN("1 hot observable of ints."){ 60 auto sc = rxsc::make_test(); 61 auto so = rx::synchronize_in_one_worker(sc); 62 auto w = sc.create_worker(); 63 const rxsc::test::messages<int> on; 64 65 auto xs = sc.make_hot_observable({ 66 on.next(100, 1), 67 on.next(210, 2), 68 on.next(240, 3), 69 on.next(280, 4), // 70 on.next(320, 5), 71 on.next(350, 6), 72 on.next(380, 7), 73 on.next(420, 8), 74 on.next(470, 9), 75 on.completed(600) 76 }); 77 WHEN("group ints on intersecting intervals"){ 78 using namespace std::chrono; 79 80 auto res = w.start( __anon94cd60ac0202() 81 [&]() { 82 return xs 83 .sample_with_time(milliseconds(100), so) 84 .as_dynamic(); 85 }, 86 370 87 ); 88 89 THEN("the output contains groups of ints"){ 90 auto required = rxu::to_vector({ 91 on.next(301, 4), 92 }); 93 auto actual = res.get_observer().messages(); 94 REQUIRE(required == actual); 95 } 96 97 THEN("there was one subscription and one unsubscription to the xs"){ 98 auto required = rxu::to_vector({ 99 on.subscribe(200, 371) 100 }); 101 auto actual = xs.subscriptions(); 102 REQUIRE(required == actual); 103 } 104 } 105 } 106 } 107 108 SCENARIO("sample with time, same", "[sample_with_time][operators]"){ 109 GIVEN("1 hot observable of ints."){ 110 auto sc = rxsc::make_test(); 111 auto so = rx::synchronize_in_one_worker(sc); 112 auto w = sc.create_worker(); 113 const rxsc::test::messages<int> on; 114 const rxsc::test::messages<std::vector<int>> v_on; 115 116 auto xs = sc.make_hot_observable({ 117 on.next(100, 1), 118 on.next(210, 2), 119 on.next(240, 3), 120 on.next(280, 4), 121 on.next(320, 5), 122 on.next(350, 6), 123 on.next(380, 7), 124 on.next(420, 8), 125 on.next(470, 9), 126 on.completed(600) 127 }); 128 WHEN("group ints on intervals"){ 129 using namespace std::chrono; 130 131 auto res = w.start( __anon94cd60ac0302() 132 [&]() { 133 return xs 134 .sample_with_time(milliseconds(100), so) 135 .as_dynamic(); 136 } 137 ); 138 139 THEN("the output contains groups of ints"){ 140 auto required = rxu::to_vector({ 141 on.next(301, 4), 142 on.next(401, 7), 143 on.next(501, 9), 144 on.completed(601) 145 }); 146 auto actual = res.get_observer().messages(); 147 REQUIRE(required == actual); 148 } 149 150 THEN("there was one subscription and one unsubscription to the xs"){ 151 auto required = rxu::to_vector({ 152 on.subscribe(200, 600) 153 }); 154 auto actual = xs.subscriptions(); 155 REQUIRE(required == actual); 156 } 157 } 158 } 159 } 160