1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_RX_CONNECTABLE_OBSERVABLE_HPP)
6 #define RXCPP_RX_CONNECTABLE_OBSERVABLE_HPP
7 
8 #include "rx-includes.hpp"
9 
10 namespace rxcpp {
11 
12 namespace detail {
13 
14 template<class T>
15 struct has_on_connect
16 {
17     struct not_void {};
18     template<class CT>
19     static auto check(int) -> decltype((*(CT*)nullptr).on_connect(composite_subscription()));
20     template<class CT>
21     static not_void check(...);
22 
23     typedef decltype(check<T>(0)) detail_result;
24     static const bool value = std::is_same<detail_result, void>::value;
25 };
26 
27 }
28 
29 template<class T>
30 class dynamic_connectable_observable
31     : public dynamic_observable<T>
32 {
33     struct state_type
34         : public std::enable_shared_from_this<state_type>
35     {
36         typedef std::function<void(composite_subscription)> onconnect_type;
37 
38         onconnect_type on_connect;
39     };
40     std::shared_ptr<state_type> state;
41 
42     template<class U>
construct(const dynamic_observable<U> & o,tag_dynamic_observable &&)43     void construct(const dynamic_observable<U>& o, tag_dynamic_observable&&) {
44         state = o.state;
45     }
46 
47     template<class U>
construct(dynamic_observable<U> && o,tag_dynamic_observable &&)48     void construct(dynamic_observable<U>&& o, tag_dynamic_observable&&) {
49         state = std::move(o.state);
50     }
51 
52     template<class SO>
construct(SO && source,rxs::tag_source &&)53     void construct(SO&& source, rxs::tag_source&&) {
54         auto so = std::make_shared<rxu::decay_t<SO>>(std::forward<SO>(source));
55         state->on_connect = [so](composite_subscription cs) mutable {
56             so->on_connect(std::move(cs));
57         };
58     }
59 
60 public:
61 
62     typedef tag_dynamic_observable dynamic_observable_tag;
63 
dynamic_connectable_observable()64     dynamic_connectable_observable()
65     {
66     }
67 
68     template<class SOF>
dynamic_connectable_observable(SOF sof)69     explicit dynamic_connectable_observable(SOF sof)
70         : dynamic_observable<T>(sof)
71         , state(std::make_shared<state_type>())
72     {
73         construct(std::move(sof),
74                   typename std::conditional<is_dynamic_observable<SOF>::value, tag_dynamic_observable, rxs::tag_source>::type());
75     }
76 
77     template<class SF, class CF>
dynamic_connectable_observable(SF && sf,CF && cf)78     dynamic_connectable_observable(SF&& sf, CF&& cf)
79         : dynamic_observable<T>(std::forward<SF>(sf))
80         , state(std::make_shared<state_type>())
81     {
82         state->on_connect = std::forward<CF>(cf);
83     }
84 
85     using dynamic_observable<T>::on_subscribe;
86 
on_connect(composite_subscription cs) const87     void on_connect(composite_subscription cs) const {
88         state->on_connect(std::move(cs));
89     }
90 };
91 
92 template<class T, class Source>
make_dynamic_connectable_observable(Source && s)93 connectable_observable<T> make_dynamic_connectable_observable(Source&& s) {
94     return connectable_observable<T>(dynamic_connectable_observable<T>(std::forward<Source>(s)));
95 }
96 
97 
98 /*!
99     \brief a source of values that is shared across all subscribers and does not start until connectable_observable::connect() is called.
100 
101     \ingroup group-observable
102 
103 */
104 template<class T, class SourceOperator>
105 class connectable_observable
106     : public observable<T, SourceOperator>
107 {
108     typedef connectable_observable<T, SourceOperator> this_type;
109     typedef observable<T, SourceOperator> base_type;
110     typedef rxu::decay_t<SourceOperator> source_operator_type;
111 
112     static_assert(detail::has_on_connect<source_operator_type>::value, "inner must have on_connect method void(composite_subscription)");
113 
114 public:
115     typedef tag_connectable_observable observable_tag;
116 
connectable_observable()117     connectable_observable()
118     {
119     }
120 
connectable_observable(const SourceOperator & o)121     explicit connectable_observable(const SourceOperator& o)
122         : base_type(o)
123     {
124     }
connectable_observable(SourceOperator && o)125     explicit connectable_observable(SourceOperator&& o)
126         : base_type(std::move(o))
127     {
128     }
129 
130     // implicit conversion between observables of the same value_type
131     template<class SO>
connectable_observable(const connectable_observable<T,SO> & o)132     connectable_observable(const connectable_observable<T, SO>& o)
133         : base_type(o)
134     {}
135     // implicit conversion between observables of the same value_type
136     template<class SO>
connectable_observable(connectable_observable<T,SO> && o)137     connectable_observable(connectable_observable<T, SO>&& o)
138         : base_type(std::move(o))
139     {}
140 
141     ///
142     /// takes any function that will take this observable and produce a result value.
143     /// this is intended to allow externally defined operators, that use subscribe,
144     /// to be connected into the expression.
145     ///
146     template<class OperatorFactory>
op(OperatorFactory && of) const147     auto op(OperatorFactory&& of) const
148         -> decltype(of(*(const this_type*)nullptr)) {
149         return      of(*this);
150         static_assert(is_operator_factory_for<this_type, OperatorFactory>::value, "Function passed for op() must have the signature Result(SourceObservable)");
151     }
152 
153     ///
154     /// performs type-forgetting conversion to a new composite_observable
155     ///
as_dynamic()156     connectable_observable<T> as_dynamic() {
157         return *this;
158     }
159 
connect(composite_subscription cs=composite_subscription ())160     composite_subscription connect(composite_subscription cs = composite_subscription()) {
161         base_type::source_operator.on_connect(cs);
162         return cs;
163     }
164 
165     /*! @copydoc rx-ref_count.hpp
166      */
167     template<class... AN>
ref_count(AN...an) const168     auto ref_count(AN... an) const
169         /// \cond SHOW_SERVICE_MEMBERS
170         -> decltype(observable_member(ref_count_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
171         /// \endcond
172     {
173         return      observable_member(ref_count_tag{},                *this, std::forward<AN>(an)...);
174     }
175 
176     /*! @copydoc rx-connect_forever.hpp
177      */
178     template<class... AN>
connect_forever(AN...an) const179     auto connect_forever(AN... an) const
180         /// \cond SHOW_SERVICE_MEMBERS
181         -> decltype(observable_member(connect_forever_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
182         /// \endcond
183     {
184         return      observable_member(connect_forever_tag{},                *this, std::forward<AN>(an)...);
185     }
186 };
187 
188 
189 }
190 
191 //
192 // support range() >> filter() >> subscribe() syntax
193 // '>>' is spelled 'stream'
194 //
195 template<class T, class SourceOperator, class OperatorFactory>
operator >>(const rxcpp::connectable_observable<T,SourceOperator> & source,OperatorFactory && of)196 auto operator >> (const rxcpp::connectable_observable<T, SourceOperator>& source, OperatorFactory&& of)
197     -> decltype(source.op(std::forward<OperatorFactory>(of))) {
198     return      source.op(std::forward<OperatorFactory>(of));
199 }
200 
201 //
202 // support range() | filter() | subscribe() syntax
203 // '|' is spelled 'pipe'
204 //
205 template<class T, class SourceOperator, class OperatorFactory>
operator |(const rxcpp::connectable_observable<T,SourceOperator> & source,OperatorFactory && of)206 auto operator | (const rxcpp::connectable_observable<T, SourceOperator>& source, OperatorFactory&& of)
207     -> decltype(source.op(std::forward<OperatorFactory>(of))) {
208     return      source.op(std::forward<OperatorFactory>(of));
209 }
210 
211 #endif
212