1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_SOURCES_RX_ERROR_HPP)
6 #define RXCPP_SOURCES_RX_ERROR_HPP
7 
8 #include "../rx-includes.hpp"
9 
10 /*! \file rx-error.hpp
11 
12     \brief Returns an observable that sends no items to observer and immediately generates an error, on the specified scheduler.
13 
14     \tparam T             the type of (not) emitted items
15     \tparam Exception     the type of the error
16     \tparam Coordination  the type of the scheduler (optional)
17 
18     \param  e   the error to be passed to observers
19     \param  cn  the scheduler to use for scheduling the items (optional)
20 
21     \return  Observable that sends no items to observer and immediately generates an error.
22 
23     \sample
24     \snippet error.cpp error sample
25     \snippet output.txt error sample
26 
27     \sample
28     \snippet error.cpp threaded error sample
29     \snippet output.txt threaded error sample
30 */
31 
32 namespace rxcpp {
33 
34 namespace sources {
35 
36 namespace detail {
37 
38 template<class T, class Coordination>
39 struct error : public source_base<T>
40 {
41     typedef error<T, Coordination> this_type;
42 
43     typedef rxu::decay_t<Coordination> coordination_type;
44 
45     typedef typename coordination_type::coordinator_type coordinator_type;
46 
47     struct error_initial_type
48     {
error_initial_typerxcpp::sources::detail::error::error_initial_type49         error_initial_type(rxu::error_ptr e, coordination_type cn)
50             : exception(e)
51             , coordination(std::move(cn))
52         {
53         }
54         rxu::error_ptr exception;
55         coordination_type coordination;
56     };
57     error_initial_type initial;
58 
errorrxcpp::sources::detail::error59     error(rxu::error_ptr e, coordination_type cn)
60         : initial(e, std::move(cn))
61     {
62     }
63 
64     template<class Subscriber>
on_subscriberxcpp::sources::detail::error65     void on_subscribe(Subscriber o) const {
66 
67         // creates a worker whose lifetime is the same as this subscription
68         auto coordinator = initial.coordination.create_coordinator(o.get_subscription());
69         auto controller = coordinator.get_worker();
70         auto exception = initial.exception;
71 
72         auto producer = [=](const rxsc::schedulable&){
73             auto& dest = o;
74             if (!dest.is_subscribed()) {
75                 // terminate loop
76                 return;
77             }
78 
79             dest.on_error(exception);
80             // o is unsubscribed
81         };
82         auto selectedProducer = on_exception(
83             [&](){return coordinator.act(producer);},
84             o);
85         if (selectedProducer.empty()) {
86             return;
87         }
88         controller.schedule(selectedProducer.get());
89     }
90 };
91 
92 struct throw_ptr_tag{};
93 struct throw_instance_tag{};
94 
95 template <class T, class Coordination>
make_error(throw_ptr_tag &&,rxu::error_ptr exception,Coordination cn)96 auto make_error(throw_ptr_tag&&, rxu::error_ptr exception, Coordination cn)
97     ->      observable<T, error<T, Coordination>> {
98     return  observable<T, error<T, Coordination>>(error<T, Coordination>(std::move(exception), std::move(cn)));
99 }
100 
101 template <class T, class E, class Coordination>
make_error(throw_instance_tag &&,E e,Coordination cn)102 auto make_error(throw_instance_tag&&, E e, Coordination cn)
103     ->      observable<T, error<T, Coordination>> {
104     rxu::error_ptr ep = rxu::make_error_ptr(e);
105     return  observable<T, error<T, Coordination>>(error<T, Coordination>(std::move(ep), std::move(cn)));
106 }
107 
108 }
109 
110 }
111 
112 namespace sources {
113 
114 /*! @copydoc rx-error.hpp
115  */
116 template<class T, class E>
error(E e)117 auto error(E e)
118     -> decltype(detail::make_error<T>(typename std::conditional<std::is_same<rxu::error_ptr, rxu::decay_t<E>>::value, detail::throw_ptr_tag, detail::throw_instance_tag>::type(), std::move(e), identity_immediate())) {
119     return      detail::make_error<T>(typename std::conditional<std::is_same<rxu::error_ptr, rxu::decay_t<E>>::value, detail::throw_ptr_tag, detail::throw_instance_tag>::type(), std::move(e), identity_immediate());
120 }
121 /*! @copydoc rx-error.hpp
122  */
123 template<class T, class E, class Coordination>
error(E e,Coordination cn)124 auto error(E e, Coordination cn)
125     -> decltype(detail::make_error<T>(typename std::conditional<std::is_same<rxu::error_ptr, rxu::decay_t<E>>::value, detail::throw_ptr_tag, detail::throw_instance_tag>::type(), std::move(e), std::move(cn))) {
126     return      detail::make_error<T>(typename std::conditional<std::is_same<rxu::error_ptr, rxu::decay_t<E>>::value, detail::throw_ptr_tag, detail::throw_instance_tag>::type(), std::move(e), std::move(cn));
127 }
128 
129 }
130 
131 }
132 
133 #endif
134