1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_RX_GROUPED_OBSERVABLE_HPP)
6 #define RXCPP_RX_GROUPED_OBSERVABLE_HPP
7 
8 #include "rx-includes.hpp"
9 
10 namespace rxcpp {
11 
12 namespace detail {
13 
14 template<class K, class Source>
15 struct has_on_get_key_for
16 {
17     struct not_void {};
18     template<class CS>
19     static auto check(int) -> decltype((*(CS*)nullptr).on_get_key());
20     template<class CS>
21     static not_void check(...);
22 
23     typedef decltype(check<Source>(0)) detail_result;
24     static const bool value = std::is_same<detail_result, rxu::decay_t<K>>::value;
25 };
26 
27 }
28 
29 template<class K, class T>
30 class dynamic_grouped_observable
31     : public dynamic_observable<T>
32 {
33 public:
34     typedef rxu::decay_t<K> key_type;
35     typedef tag_dynamic_grouped_observable dynamic_observable_tag;
36 
37 private:
38     struct state_type
39         : public std::enable_shared_from_this<state_type>
40     {
41         typedef std::function<key_type()> ongetkey_type;
42 
43         ongetkey_type on_get_key;
44     };
45     std::shared_ptr<state_type> state;
46 
47     template<class U, class V>
48     friend bool operator==(const dynamic_grouped_observable<U, V>&, const dynamic_grouped_observable<U, V>&);
49 
50     template<class U, class V>
construct(const dynamic_grouped_observable<U,V> & o,const tag_dynamic_grouped_observable &)51     void construct(const dynamic_grouped_observable<U, V>& o, const tag_dynamic_grouped_observable&) {
52         state = o.state;
53     }
54 
55     template<class U, class V>
construct(dynamic_grouped_observable<U,V> && o,const tag_dynamic_grouped_observable &)56     void construct(dynamic_grouped_observable<U, V>&& o, const tag_dynamic_grouped_observable&) {
57         state = std::move(o.state);
58     }
59 
60     template<class SO>
construct(SO && source,const rxs::tag_source &)61     void construct(SO&& source, const rxs::tag_source&) {
62         auto so = std::make_shared<rxu::decay_t<SO>>(std::forward<SO>(source));
63         state->on_get_key = [so]() mutable {
64             return so->on_get_key();
65         };
66     }
67 
68 public:
69 
dynamic_grouped_observable()70     dynamic_grouped_observable()
71     {
72     }
73 
74     template<class SOF>
dynamic_grouped_observable(SOF sof)75     explicit dynamic_grouped_observable(SOF sof)
76         : dynamic_observable<T>(sof)
77         , state(std::make_shared<state_type>())
78     {
79         construct(std::move(sof),
80                   typename std::conditional<is_dynamic_grouped_observable<SOF>::value, tag_dynamic_grouped_observable, rxs::tag_source>::type());
81     }
82 
83     template<class SF, class CF>
dynamic_grouped_observable(SF && sf,CF && cf)84     dynamic_grouped_observable(SF&& sf, CF&& cf)
85         : dynamic_observable<T>(std::forward<SF>(sf))
86         , state(std::make_shared<state_type>())
87     {
88         state->on_connect = std::forward<CF>(cf);
89     }
90 
91     using dynamic_observable<T>::on_subscribe;
92 
on_get_key() const93     key_type on_get_key() const {
94         return state->on_get_key();
95     }
96 };
97 
98 template<class K, class T>
operator ==(const dynamic_grouped_observable<K,T> & lhs,const dynamic_grouped_observable<K,T> & rhs)99 inline bool operator==(const dynamic_grouped_observable<K, T>& lhs, const dynamic_grouped_observable<K, T>& rhs) {
100     return lhs.state == rhs.state;
101 }
102 template<class K, class T>
operator !=(const dynamic_grouped_observable<K,T> & lhs,const dynamic_grouped_observable<K,T> & rhs)103 inline bool operator!=(const dynamic_grouped_observable<K, T>& lhs, const dynamic_grouped_observable<K, T>& rhs) {
104     return !(lhs == rhs);
105 }
106 
107 template<class K, class T, class Source>
make_dynamic_grouped_observable(Source && s)108 grouped_observable<K, T> make_dynamic_grouped_observable(Source&& s) {
109     return grouped_observable<K, T>(dynamic_grouped_observable<K, T>(std::forward<Source>(s)));
110 }
111 
112 
113 
114 /*!
115     \brief a source of observables which each emit values from one category specified by the key selector.
116 
117     \ingroup group-observable
118 
119 */
120 template<class K, class T, class SourceOperator>
121 class grouped_observable
122     : public observable<T, SourceOperator>
123 {
124     typedef grouped_observable<K, T, SourceOperator> this_type;
125     typedef observable<T, SourceOperator> base_type;
126     typedef rxu::decay_t<SourceOperator> source_operator_type;
127 
128     static_assert(detail::has_on_get_key_for<K, source_operator_type>::value, "inner must have on_get_key method key_type()");
129 
130 public:
131     typedef rxu::decay_t<K> key_type;
132     typedef tag_grouped_observable observable_tag;
133 
grouped_observable()134     grouped_observable()
135     {
136     }
137 
grouped_observable(const SourceOperator & o)138     explicit grouped_observable(const SourceOperator& o)
139         : base_type(o)
140     {
141     }
grouped_observable(SourceOperator && o)142     explicit grouped_observable(SourceOperator&& o)
143         : base_type(std::move(o))
144     {
145     }
146 
147     // implicit conversion between observables of the same value_type
148     template<class SO>
grouped_observable(const grouped_observable<K,T,SO> & o)149     grouped_observable(const grouped_observable<K, T, SO>& o)
150         : base_type(o)
151     {}
152     // implicit conversion between observables of the same value_type
153     template<class SO>
grouped_observable(grouped_observable<K,T,SO> && o)154     grouped_observable(grouped_observable<K, T, SO>&& o)
155         : base_type(std::move(o))
156     {}
157 
158     ///
159     /// performs type-forgetting conversion to a new grouped_observable
160     ///
as_dynamic() const161     grouped_observable<K, T> as_dynamic() const {
162         return *this;
163     }
164 
get_key() const165     key_type get_key() const {
166         return base_type::source_operator.on_get_key();
167     }
168 };
169 
170 
171 }
172 
173 //
174 // support range() >> filter() >> subscribe() syntax
175 // '>>' is spelled 'stream'
176 //
177 template<class K, class T, class SourceOperator, class OperatorFactory>
operator >>(const rxcpp::grouped_observable<K,T,SourceOperator> & source,OperatorFactory && of)178 auto operator >> (const rxcpp::grouped_observable<K, T, SourceOperator>& source, OperatorFactory&& of)
179     -> decltype(source.op(std::forward<OperatorFactory>(of))) {
180     return      source.op(std::forward<OperatorFactory>(of));
181 }
182 
183 //
184 // support range() | filter() | subscribe() syntax
185 // '|' is spelled 'pipe'
186 //
187 template<class K, class T, class SourceOperator, class OperatorFactory>
operator |(const rxcpp::grouped_observable<K,T,SourceOperator> & source,OperatorFactory && of)188 auto operator | (const rxcpp::grouped_observable<K, T, SourceOperator>& source, OperatorFactory&& of)
189     -> decltype(source.op(std::forward<OperatorFactory>(of))) {
190     return      source.op(std::forward<OperatorFactory>(of));
191 }
192 
193 #endif
194