1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_RX_NOTIFICATION_HPP)
6 #define RXCPP_RX_NOTIFICATION_HPP
7 
8 #include "rx-includes.hpp"
9 
10 namespace rxcpp {
11 
12 namespace notifications {
13 
14 class subscription
15 {
16     long s;
17     long u;
18 
19 public:
subscription(long s)20     explicit inline subscription(long s)
21         : s(s), u(std::numeric_limits<long>::max()) {
22     }
subscription(long s,long u)23     inline subscription(long s, long u)
24         : s(s), u(u) {
25     }
subscribe() const26     inline long subscribe() const {
27         return s;
28     }
unsubscribe() const29     inline long unsubscribe() const {
30         return u;
31     }
32 };
33 
operator ==(subscription lhs,subscription rhs)34 inline bool operator == (subscription lhs, subscription rhs) {
35     return lhs.subscribe() == rhs.subscribe() && lhs.unsubscribe() == rhs.unsubscribe();
36 }
37 
operator <<(std::ostream & out,const subscription & s)38 inline std::ostream& operator<< (std::ostream& out, const subscription& s) {
39     out << s.subscribe() << "-" << s.unsubscribe();
40     return out;
41 }
42 
43 namespace detail {
44 
45 template<typename T>
46 struct notification_base
47     : public std::enable_shared_from_this<notification_base<T>>
48 {
49     typedef subscriber<T> observer_type;
50     typedef std::shared_ptr<notification_base<T>> type;
51 
~notification_baserxcpp::notifications::detail::notification_base52     virtual ~notification_base() {}
53 
54     virtual void out(std::ostream& out) const =0;
55     virtual bool equals(const type& other) const = 0;
56     virtual void accept(const observer_type& o) const =0;
57 };
58 
59 template<class T>
60 std::ostream& operator<< (std::ostream& out, const std::vector<T>& v);
61 
62 template<class T>
to_stream(std::ostream & os,const T & t,int,int)63 auto to_stream(std::ostream& os, const T& t, int, int)
64     -> decltype(os << t) {
65     return      os << t;
66 }
67 
68 #if RXCPP_USE_RTTI
69 template<class T>
to_stream(std::ostream & os,const T &,int,...)70 std::ostream& to_stream(std::ostream& os, const T&, int, ...) {
71     return os << "< " << typeid(T).name() << " does not support ostream>";
72 }
73 #endif
74 
75 template<class T>
to_stream(std::ostream & os,const T &,...)76 std::ostream& to_stream(std::ostream& os, const T&, ...) {
77     return os << "<the value does not support ostream>";
78 }
79 
80 template<class T>
ostreamvector(std::ostream & os,const std::vector<T> & v)81 inline std::ostream& ostreamvector (std::ostream& os, const std::vector<T>& v) {
82     os << "[";
83     bool doemit = false;
84     for(auto& i : v) {
85         if (doemit) {
86             os << ", ";
87         } else {
88             doemit = true;
89         }
90         to_stream(os, i, 0, 0);
91     }
92     os << "]";
93     return os;
94 }
95 
96 template<class T>
operator <<(std::ostream & os,const std::vector<T> & v)97 inline std::ostream& operator<< (std::ostream& os, const std::vector<T>& v) {
98     return ostreamvector(os, v);
99 }
100 
101 template<class T>
equals(const T & lhs,const T & rhs,int)102 auto equals(const T& lhs, const T& rhs, int)
103     -> decltype(bool(lhs == rhs)) {
104     return lhs == rhs;
105 }
106 
107 template<class T>
equals(const T &,const T &,...)108 bool equals(const T&, const T&, ...) {
109     rxu::throw_exception(std::runtime_error("value does not support equality tests"));
110     return false;
111 }
112 
113 }
114 
115 template<typename T>
116 struct notification
117 {
118     typedef typename detail::notification_base<T>::type type;
119     typedef typename detail::notification_base<T>::observer_type observer_type;
120 
121 private:
122     typedef detail::notification_base<T> base;
123 
124     struct on_next_notification : public base {
on_next_notificationrxcpp::notifications::notification::on_next_notification125         on_next_notification(T value) : value(std::move(value)) {
126         }
on_next_notificationrxcpp::notifications::notification::on_next_notification127         on_next_notification(const on_next_notification& o) : value(o.value) {}
on_next_notificationrxcpp::notifications::notification::on_next_notification128         on_next_notification(const on_next_notification&& o) : value(std::move(o.value)) {}
operator =rxcpp::notifications::notification::on_next_notification129         on_next_notification& operator=(on_next_notification o) { value = std::move(o.value); return *this; }
outrxcpp::notifications::notification::on_next_notification130         virtual void out(std::ostream& os) const {
131             os << "on_next( ";
132             detail::to_stream(os, value, 0, 0);
133             os << ")";
134         }
equalsrxcpp::notifications::notification::on_next_notification135         virtual bool equals(const typename base::type& other) const {
136             bool result = false;
137             other->accept(make_subscriber<T>(make_observer_dynamic<T>([this, &result](T v) {
138                     result = detail::equals(this->value, v, 0);
139                 })));
140             return result;
141         }
acceptrxcpp::notifications::notification::on_next_notification142         virtual void accept(const typename base::observer_type& o) const {
143             o.on_next(value);
144         }
145         const T value;
146     };
147 
148     struct on_error_notification : public base {
on_error_notificationrxcpp::notifications::notification::on_error_notification149         on_error_notification(rxu::error_ptr ep) : ep(ep) {
150         }
on_error_notificationrxcpp::notifications::notification::on_error_notification151         on_error_notification(const on_error_notification& o) : ep(o.ep) {}
on_error_notificationrxcpp::notifications::notification::on_error_notification152         on_error_notification(const on_error_notification&& o) : ep(std::move(o.ep)) {}
operator =rxcpp::notifications::notification::on_error_notification153         on_error_notification& operator=(on_error_notification o) { ep = std::move(o.ep); return *this; }
outrxcpp::notifications::notification::on_error_notification154         virtual void out(std::ostream& os) const {
155             os << "on_error(";
156             os << rxu::what(ep);
157             os << ")";
158         }
equalsrxcpp::notifications::notification::on_error_notification159         virtual bool equals(const typename base::type& other) const {
160             bool result = false;
161             // not trying to compare exceptions
162             other->accept(make_subscriber<T>(make_observer_dynamic<T>([](T){}, [&result](rxu::error_ptr){
163                 result = true;
164             })));
165             return result;
166         }
acceptrxcpp::notifications::notification::on_error_notification167         virtual void accept(const typename base::observer_type& o) const {
168             o.on_error(ep);
169         }
170         const rxu::error_ptr ep;
171     };
172 
173     struct on_completed_notification : public base {
on_completed_notificationrxcpp::notifications::notification::on_completed_notification174         on_completed_notification() {
175         }
outrxcpp::notifications::notification::on_completed_notification176         virtual void out(std::ostream& os) const {
177             os << "on_completed()";
178         }
equalsrxcpp::notifications::notification::on_completed_notification179         virtual bool equals(const typename base::type& other) const {
180             bool result = false;
181             other->accept(make_subscriber<T>(make_observer_dynamic<T>([](T){}, [&result](){
182                 result = true;
183             })));
184             return result;
185         }
acceptrxcpp::notifications::notification::on_completed_notification186         virtual void accept(const typename base::observer_type& o) const {
187             o.on_completed();
188         }
189     };
190 
191     struct exception_tag {};
192 
193     template<typename Exception>
194     static
make_on_errorrxcpp::notifications::notification195     type make_on_error(exception_tag&&, Exception&& e) {
196         rxu::error_ptr ep = rxu::make_error_ptr(std::forward<Exception>(e));
197         return std::make_shared<on_error_notification>(ep);
198     }
199 
200     struct exception_ptr_tag {};
201 
202     static
make_on_errorrxcpp::notifications::notification203     type make_on_error(exception_ptr_tag&&, rxu::error_ptr ep) {
204         return std::make_shared<on_error_notification>(ep);
205     }
206 
207 public:
208     template<typename U>
on_nextrxcpp::notifications::notification209     static type on_next(U value) {
210         return std::make_shared<on_next_notification>(std::move(value));
211     }
212 
on_completedrxcpp::notifications::notification213     static type on_completed() {
214         return std::make_shared<on_completed_notification>();
215     }
216 
217     template<typename Exception>
on_errorrxcpp::notifications::notification218     static type on_error(Exception&& e) {
219         return make_on_error(typename std::conditional<
220             std::is_same<rxu::decay_t<Exception>, rxu::error_ptr>::value,
221                 exception_ptr_tag, exception_tag>::type(),
222             std::forward<Exception>(e));
223     }
224 };
225 
226 template<class T>
operator ==(const std::shared_ptr<detail::notification_base<T>> & lhs,const std::shared_ptr<detail::notification_base<T>> & rhs)227 bool operator == (const std::shared_ptr<detail::notification_base<T>>& lhs, const std::shared_ptr<detail::notification_base<T>>& rhs) {
228     if (!lhs && !rhs) {return true;}
229     if (!lhs || !rhs) {return false;}
230     return lhs->equals(rhs);
231 }
232 
233 template<class T>
operator <<(std::ostream & os,const std::shared_ptr<detail::notification_base<T>> & n)234 std::ostream& operator<< (std::ostream& os, const std::shared_ptr<detail::notification_base<T>>& n) {
235     n->out(os);
236     return os;
237 }
238 
239 
240 template<class T>
241 class recorded
242 {
243     long t;
244     T v;
245 public:
recorded(long t,T v)246     recorded(long t, T v)
247         : t(t), v(v) {
248     }
time() const249     long time() const {
250         return t;
251     }
value() const252     const T& value() const {
253         return v;
254     }
255 };
256 
257 template<class T>
operator ==(recorded<T> lhs,recorded<T> rhs)258 bool operator == (recorded<T> lhs, recorded<T> rhs) {
259     return lhs.time() == rhs.time() && lhs.value() == rhs.value();
260 }
261 
262 template<class T>
operator <<(std::ostream & out,const recorded<T> & r)263 std::ostream& operator<< (std::ostream& out, const recorded<T>& r) {
264     out << "@" << r.time() << "-" << r.value();
265     return out;
266 }
267 
268 }
269 namespace rxn=notifications;
270 
operator <<(std::ostream & out,const std::vector<rxcpp::notifications::subscription> & vs)271 inline std::ostream& operator<< (std::ostream& out, const std::vector<rxcpp::notifications::subscription>& vs) {
272     return rxcpp::notifications::detail::ostreamvector(out, vs);
273 }
274 template<class T>
operator <<(std::ostream & out,const std::vector<rxcpp::notifications::recorded<T>> & vr)275 inline std::ostream& operator<< (std::ostream& out, const std::vector<rxcpp::notifications::recorded<T>>& vr) {
276     return rxcpp::notifications::detail::ostreamvector(out, vr);
277 }
278 
279 }
280 
281 #endif
282