1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_RX_REPLAYSUBJECT_HPP)
6 #define RXCPP_RX_REPLAYSUBJECT_HPP
7 
8 #include "../rx-includes.hpp"
9 
10 namespace rxcpp {
11 
12 namespace subjects {
13 
14 namespace detail {
15 
16 template<class Coordination>
17 struct replay_traits
18 {
19     typedef rxu::maybe<std::size_t> count_type;
20     typedef rxu::maybe<rxsc::scheduler::clock_type::duration> period_type;
21     typedef rxsc::scheduler::clock_type::time_point time_point_type;
22     typedef rxu::decay_t<Coordination> coordination_type;
23     typedef typename coordination_type::coordinator_type coordinator_type;
24 };
25 
26 template<class T, class Coordination>
27 class replay_observer : public detail::multicast_observer<T>
28 {
29     typedef replay_observer<T, Coordination> this_type;
30     typedef detail::multicast_observer<T> base_type;
31 
32     typedef replay_traits<Coordination> traits;
33     typedef typename traits::count_type count_type;
34     typedef typename traits::period_type period_type;
35     typedef typename traits::time_point_type time_point_type;
36     typedef typename traits::coordination_type coordination_type;
37     typedef typename traits::coordinator_type coordinator_type;
38 
39     class replay_observer_state : public std::enable_shared_from_this<replay_observer_state>
40     {
41         mutable std::mutex lock;
42         mutable std::list<T> values;
43         mutable std::list<time_point_type> time_points;
44         mutable count_type count;
45         mutable period_type period;
46         mutable composite_subscription replayLifetime;
47     public:
48         mutable coordination_type coordination;
49         mutable coordinator_type coordinator;
50 
51     private:
remove_oldest() const52         void remove_oldest() const {
53             values.pop_front();
54             if (!period.empty()) {
55                 time_points.pop_front();
56             }
57         }
58 
59     public:
~replay_observer_state()60         ~replay_observer_state(){
61             replayLifetime.unsubscribe();
62         }
replay_observer_state(count_type _count,period_type _period,coordination_type _coordination,coordinator_type _coordinator,composite_subscription _replayLifetime)63         explicit replay_observer_state(count_type _count, period_type _period, coordination_type _coordination, coordinator_type _coordinator, composite_subscription _replayLifetime)
64             : count(_count)
65             , period(_period)
66             , replayLifetime(_replayLifetime)
67             , coordination(std::move(_coordination))
68             , coordinator(std::move(_coordinator))
69         {
70         }
71 
add(T v) const72         void add(T v) const {
73             std::unique_lock<std::mutex> guard(lock);
74 
75             if (!count.empty()) {
76                 if (values.size() == count.get())
77                     remove_oldest();
78             }
79 
80             if (!period.empty()) {
81                 auto now = coordination.now();
82                 while (!time_points.empty() && (now - time_points.front() > period.get()))
83                     remove_oldest();
84                 time_points.push_back(now);
85             }
86 
87             values.push_back(std::move(v));
88         }
get() const89         std::list<T> get() const {
90             std::unique_lock<std::mutex> guard(lock);
91             return values;
92         }
93     };
94 
95     std::shared_ptr<replay_observer_state> state;
96 
97 public:
replay_observer(count_type count,period_type period,coordination_type coordination,composite_subscription replayLifetime,composite_subscription subscriberLifetime)98     replay_observer(count_type count, period_type period, coordination_type coordination, composite_subscription replayLifetime, composite_subscription subscriberLifetime)
99         : base_type(subscriberLifetime)
100     {
101         replayLifetime.add(subscriberLifetime);
102         auto coordinator = coordination.create_coordinator(replayLifetime);
103         state = std::make_shared<replay_observer_state>(std::move(count), std::move(period), std::move(coordination), std::move(coordinator), std::move(replayLifetime));
104     }
105 
get_subscriber() const106     subscriber<T> get_subscriber() const {
107         return make_subscriber<T>(this->get_id(), this->get_subscription(), observer<T, detail::replay_observer<T, Coordination>>(*this)).as_dynamic();
108     }
109 
get_values() const110     std::list<T> get_values() const {
111         return state->get();
112     }
113 
get_coordinator() const114     coordinator_type& get_coordinator() const {
115         return state->coordinator;
116     }
117 
118     template<class V>
on_next(V v) const119     void on_next(V v) const {
120         state->add(v);
121         base_type::on_next(std::move(v));
122     }
123 };
124 
125 }
126 
127 template<class T, class Coordination>
128 class replay
129 {
130     typedef detail::replay_traits<Coordination> traits;
131     typedef typename traits::count_type count_type;
132     typedef typename traits::period_type period_type;
133     typedef typename traits::time_point_type time_point_type;
134 
135     detail::replay_observer<T, Coordination> s;
136 
137 public:
replay(Coordination cn,composite_subscription cs=composite_subscription ())138     explicit replay(Coordination cn, composite_subscription cs = composite_subscription())
139         : s(count_type(), period_type(), cn, cs, composite_subscription{})
140     {
141     }
142 
replay(std::size_t count,Coordination cn,composite_subscription cs=composite_subscription ())143     replay(std::size_t count, Coordination cn, composite_subscription cs = composite_subscription())
144         : s(count_type(std::move(count)), period_type(), cn, cs, composite_subscription{})
145     {
146     }
147 
replay(rxsc::scheduler::clock_type::duration period,Coordination cn,composite_subscription cs=composite_subscription ())148     replay(rxsc::scheduler::clock_type::duration period, Coordination cn, composite_subscription cs = composite_subscription())
149         : s(count_type(), period_type(period), cn, cs, composite_subscription{})
150     {
151     }
152 
replay(std::size_t count,rxsc::scheduler::clock_type::duration period,Coordination cn,composite_subscription cs=composite_subscription ())153     replay(std::size_t count, rxsc::scheduler::clock_type::duration period, Coordination cn, composite_subscription cs = composite_subscription())
154         : s(count_type(count), period_type(period), cn, cs, composite_subscription{})
155     {
156     }
157 
has_observers() const158     bool has_observers() const {
159         return s.has_observers();
160     }
161 
get_values() const162     std::list<T> get_values() const {
163         return s.get_values();
164     }
165 
get_subscriber() const166     subscriber<T> get_subscriber() const {
167         return s.get_subscriber();
168     }
169 
get_observable() const170     observable<T> get_observable() const {
171         auto keepAlive = s;
172         auto observable = make_observable_dynamic<T>([=](subscriber<T> o){
173             for (auto&& value: get_values()) {
174                 o.on_next(value);
175             }
176             keepAlive.add(keepAlive.get_subscriber(), std::move(o));
177         });
178         return s.get_coordinator().in(observable);
179     }
180 };
181 
182 }
183 
184 }
185 
186 #endif
187