1 #include "../test.h"
2 #include "rxcpp/operators/rx-time_interval.hpp"
3 
4 using namespace std::chrono;
5 
6 SCENARIO("should not emit time intervals if the source never emits any items", "[time_interval][operators]"){
7     GIVEN("a source"){
8         typedef rxsc::detail::test_type::clock_type::time_point::duration duration;
9 
10         auto sc = rxsc::make_test();
11         auto w = sc.create_worker();
12         const rxsc::test::messages<int> on;
13 
14         auto xs = sc.make_hot_observable({
15             on.next(150, 1)
16         });
17 
18         WHEN("time_interval operator is invoked"){
19 
20             auto res = w.start(
__anonf55c643d0102() 21                 [xs]() {
22                     return xs
23                         | rxo::time_interval();
24                 }
25             );
26 
27             THEN("the output is empty"){
28                 auto required = std::vector<rxsc::test::messages<duration>::recorded_type>();
29                 auto actual = res.get_observer().messages();
30                 REQUIRE(required == actual);
31             }
32 
33             THEN("there was 1 subscription/unsubscription to the source"){
34                 auto required = rxu::to_vector({
35                     on.subscribe(200, 1000)
36                 });
37                 auto actual = xs.subscriptions();
38                 REQUIRE(required == actual);
39             }
40         }
41     }
42 }
43 
44 SCENARIO("should not emit time intervals if the source observable is empty", "[time_interval][operators]"){
45     GIVEN("a source"){
46         typedef rxsc::detail::test_type::clock_type::time_point::duration duration;
47 
48         auto sc = rxsc::make_test();
49         auto so = rx::synchronize_in_one_worker(sc);
50         auto w = sc.create_worker();
51         const rxsc::test::messages<int> on;
52         const rxsc::test::messages<duration> on_time_interval;
53 
54         auto xs = sc.make_hot_observable({
55             on.next(150, 1),
56             on.completed(250)
57         });
58 
59         WHEN("time_interval operator is invoked"){
60 
61             auto res = w.start(
__anonf55c643d0202() 62                 [so, xs]() {
63                     return xs.time_interval();
64                 }
65             );
66 
67             THEN("the output only contains complete message"){
68                 auto required = rxu::to_vector({
69                     on_time_interval.completed(250)
70                 });
71                 auto actual = res.get_observer().messages();
72                 REQUIRE(required == actual);
73             }
74 
75             THEN("there was 1 subscription/unsubscription to the source"){
76                 auto required = rxu::to_vector({
77                     on.subscribe(200, 250)
78                 });
79                 auto actual = xs.subscriptions();
80                 REQUIRE(required == actual);
81             }
82 
83         }
84     }
85 }
86 
87 SCENARIO("should emit time intervals for every item in the source observable", "[time_interval][operators]"){
88     GIVEN("a source"){
89         typedef rxsc::detail::test_type::clock_type clock_type;
90         typedef clock_type::time_point::duration duration;
91 
92         auto sc = rxsc::make_test();
93         auto so = rx::synchronize_in_one_worker(sc);
94         auto w = sc.create_worker();
95         const rxsc::test::messages<int> on;
96         const rxsc::test::messages<duration> on_time_interval;
97 
98         auto xs = sc.make_hot_observable({
99             on.next(150, 1),
100             on.next(210, 2),
101             on.next(240, 3),
102             on.completed(250)
103         });
104 
105         WHEN("time_interval operator is invoked"){
106 
107             auto res = w.start(
__anonf55c643d0302() 108                 [so, xs]() {
109                     return xs.time_interval(so);
110                 }
111             );
112 
113             THEN("the output contains the emitted items while subscribed"){
114                 auto required = rxu::to_vector({
115                     on_time_interval.next(210, milliseconds(10)),
116                     on_time_interval.next(240, milliseconds(30)),
117                     on_time_interval.completed(250)
118                 });
119                 auto actual = res.get_observer().messages();
120                 REQUIRE(required == actual);
121             }
122 
123             THEN("there was 1 subscription/unsubscription to the source"){
124                 auto required = rxu::to_vector({
125                     on.subscribe(200, 250)
126                 });
127                 auto actual = xs.subscriptions();
128                 REQUIRE(required == actual);
129             }
130 
131         }
132     }
133 }
134 
135 SCENARIO("should emit time intervals and an error if there is an error", "[time_interval][operators]"){
136     GIVEN("a source"){
137         typedef rxsc::detail::test_type::clock_type clock_type;
138         typedef clock_type::time_point::duration duration;
139 
140         auto sc = rxsc::make_test();
141         auto so = rx::synchronize_in_one_worker(sc);
142         auto w = sc.create_worker();
143         const rxsc::test::messages<int> on;
144         const rxsc::test::messages<duration> on_time_interval;
145 
146         std::runtime_error ex("on_error from source");
147 
148         auto xs = sc.make_hot_observable({
149             on.next(150, 1),
150             on.next(210, 2),
151             on.next(240, 3),
152             on.error(250, ex)
153         });
154 
155         WHEN("time_interval operator is invoked"){
156 
157             auto res = w.start(
__anonf55c643d0402() 158                 [so, xs]() {
159                     return xs.time_interval(so);
160                 }
161             );
162 
163             THEN("the output contains emitted items and an error"){
164                 auto required = rxu::to_vector({
165                     on_time_interval.next(210, milliseconds(10)),
166                     on_time_interval.next(240, milliseconds(30)),
167                     on_time_interval.error(250, ex)
168                 });
169                 auto actual = res.get_observer().messages();
170                 REQUIRE(required == actual);
171             }
172 
173             THEN("there was 1 subscription/unsubscription to the source"){
174                 auto required = rxu::to_vector({
175                     on.subscribe(200, 250)
176                 });
177                 auto actual = xs.subscriptions();
178                 REQUIRE(required == actual);
179             }
180 
181         }
182     }
183 }
184