1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 /*! \file rx-publish.hpp
6 
7     \brief Turn a cold observable hot and allow connections to the source to be independent of subscriptions.
8            Turn a cold observable hot, send the most recent value to any new subscriber, and allow connections to the source to be independent of subscriptions.
9 
10     \tparam  T  the type of the emitted item (optional).
11 
12     \param  first  an initial item to be emitted by the resulting observable at connection time before emitting the items from the source observable; not emitted to observers that subscribe after the time of connection (optional).
13     \param  cs  the subscription to control lifetime (optional).
14 
15     \return  rxcpp::connectable_observable that upon connection causes the source observable to emit items to its observers.
16 
17     \sample
18     \snippet publish.cpp publish subject sample
19     \snippet output.txt publish subject sample
20 
21     \sample
22     \snippet publish.cpp publish behavior sample
23     \snippet output.txt publish behavior sample
24 
25     \sample
26     \snippet publish.cpp publish diamond samethread sample
27     \snippet output.txt publish diamond samethread sample
28 
29     \sample
30     \snippet publish.cpp publish diamond bgthread sample
31     \snippet output.txt publish diamond bgthread sample
32 
33     \sample
34     \snippet ref_count.cpp ref_count other diamond sample
35     \snippet output.txt ref_count other diamond sample
36 */
37 
38 #if !defined(RXCPP_OPERATORS_RX_PUBLISH_HPP)
39 #define RXCPP_OPERATORS_RX_PUBLISH_HPP
40 
41 #include "../rx-includes.hpp"
42 #include "./rx-multicast.hpp"
43 
44 namespace rxcpp {
45 
46 namespace operators {
47 
48 namespace detail {
49 
50 template<class... AN>
51 struct publish_invalid_arguments {};
52 
53 template<class... AN>
54 struct publish_invalid : public rxo::operator_base<publish_invalid_arguments<AN...>> {
55     using type = observable<publish_invalid_arguments<AN...>, publish_invalid<AN...>>;
56 };
57 template<class... AN>
58 using publish_invalid_t = typename publish_invalid<AN...>::type;
59 
60 }
61 
62 /*! @copydoc rx-publish.hpp
63 */
64 template<class... AN>
publish(AN &&...an)65 auto publish(AN&&... an)
66     ->      operator_factory<publish_tag, AN...> {
67      return operator_factory<publish_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
68 }
69 
70 /*! \brief Turn a cold observable hot and allow connections to the source to be independent of subscriptions.
71 
72     \tparam  Coordination  the type of the scheduler.
73 
74     \param  cn  a scheduler all values are queued and delivered on.
75     \param  cs  the subscription to control lifetime (optional).
76 
77     \return  rxcpp::connectable_observable that upon connection causes the source observable to emit items to its observers, on the specified scheduler.
78 
79     \sample
80     \snippet publish.cpp publish_synchronized sample
81     \snippet output.txt publish_synchronized sample
82 */
83 template<class... AN>
publish_synchronized(AN &&...an)84 auto publish_synchronized(AN&&... an)
85     ->      operator_factory<publish_synchronized_tag, AN...> {
86      return operator_factory<publish_synchronized_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
87 }
88 
89 }
90 
91 template<>
92 struct member_overload<publish_tag>
93 {
94     template<class Observable,
95         class Enabled = rxu::enable_if_all_true_type_t<
96             is_observable<Observable>>,
97         class SourceValue = rxu::value_type_t<Observable>,
98         class Subject = rxsub::subject<SourceValue>,
99         class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, Subject>,
100         class Result = connectable_observable<SourceValue, Multicast>
101         >
memberrxcpp::member_overload102     static Result member(Observable&& o) {
103         return Result(Multicast(std::forward<Observable>(o), Subject(composite_subscription())));
104     }
105 
106     template<class Observable,
107         class Enabled = rxu::enable_if_all_true_type_t<
108             is_observable<Observable>>,
109         class SourceValue = rxu::value_type_t<Observable>,
110         class Subject = rxsub::subject<SourceValue>,
111         class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, Subject>,
112         class Result = connectable_observable<SourceValue, Multicast>
113         >
memberrxcpp::member_overload114     static Result member(Observable&& o, composite_subscription cs) {
115         return Result(Multicast(std::forward<Observable>(o), Subject(cs)));
116     }
117 
118     template<class Observable, class T,
119         class Enabled = rxu::enable_if_all_true_type_t<
120             is_observable<Observable>>,
121         class SourceValue = rxu::value_type_t<Observable>,
122         class Subject = rxsub::behavior<SourceValue>,
123         class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, Subject>,
124         class Result = connectable_observable<SourceValue, Multicast>
125         >
memberrxcpp::member_overload126     static Result member(Observable&& o, T first, composite_subscription cs = composite_subscription()) {
127         return Result(Multicast(std::forward<Observable>(o), Subject(first, cs)));
128     }
129 
130     template<class... AN>
memberrxcpp::member_overload131     static operators::detail::publish_invalid_t<AN...> member(AN...) {
132         std::terminate();
133         return {};
134         static_assert(sizeof...(AN) == 10000, "publish takes (optional CompositeSubscription) or (T, optional CompositeSubscription)");
135     }
136 };
137 
138 template<>
139 struct member_overload<publish_synchronized_tag>
140 {
141     template<class Observable, class Coordination,
142         class Enabled = rxu::enable_if_all_true_type_t<
143             is_observable<Observable>,
144             is_coordination<Coordination>>,
145         class SourceValue = rxu::value_type_t<Observable>,
146         class Subject = rxsub::synchronize<SourceValue, rxu::decay_t<Coordination>>,
147         class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, Subject>,
148         class Result = connectable_observable<SourceValue, Multicast>
149         >
memberrxcpp::member_overload150     static Result member(Observable&& o, Coordination&& cn, composite_subscription cs = composite_subscription()) {
151         return Result(Multicast(std::forward<Observable>(o), Subject(std::forward<Coordination>(cn), cs)));
152     }
153 
154     template<class... AN>
memberrxcpp::member_overload155     static operators::detail::publish_invalid_t<AN...> member(AN...) {
156         std::terminate();
157         return {};
158         static_assert(sizeof...(AN) == 10000, "publish_synchronized takes (Coordination, optional CompositeSubscription)");
159     }
160 };
161 
162 }
163 
164 #endif
165