1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_RX_OBSERVER_HPP)
6 #define RXCPP_RX_OBSERVER_HPP
7 
8 #include "rx-includes.hpp"
9 
10 namespace rxcpp {
11 
12 
13 template<class T>
14 struct observer_base
15 {
16     typedef T value_type;
17     typedef tag_observer observer_tag;
18 };
19 
20 namespace detail {
21 template<class T>
22 struct OnNextEmpty
23 {
operator ()rxcpp::detail::OnNextEmpty24     void operator()(const T&) const {}
25 };
26 struct OnErrorEmpty
27 {
operator ()rxcpp::detail::OnErrorEmpty28     void operator()(rxu::error_ptr) const {
29         // error implicitly ignored, abort
30         std::terminate();
31     }
32 };
33 struct OnErrorIgnore
34 {
operator ()rxcpp::detail::OnErrorIgnore35     void operator()(rxu::error_ptr) const {
36     }
37 };
38 struct OnCompletedEmpty
39 {
operator ()rxcpp::detail::OnCompletedEmpty40     void operator()() const {}
41 };
42 
43 template<class T, class State, class OnNext>
44 struct OnNextForward
45 {
46     using state_t = rxu::decay_t<State>;
47     using onnext_t = rxu::decay_t<OnNext>;
OnNextForwardrxcpp::detail::OnNextForward48     OnNextForward() : onnext() {}
OnNextForwardrxcpp::detail::OnNextForward49     explicit OnNextForward(onnext_t on) : onnext(std::move(on)) {}
50     onnext_t onnext;
operator ()rxcpp::detail::OnNextForward51     void operator()(state_t& s, T& t) const {
52         onnext(s, t);
53     }
operator ()rxcpp::detail::OnNextForward54     void operator()(state_t& s, T&& t) const {
55         onnext(s, t);
56     }
57 };
58 template<class T, class State>
59 struct OnNextForward<T, State, void>
60 {
61     using state_t = rxu::decay_t<State>;
OnNextForwardrxcpp::detail::OnNextForward62     OnNextForward() {}
operator ()rxcpp::detail::OnNextForward63     void operator()(state_t& s, T& t) const {
64         s.on_next(t);
65     }
operator ()rxcpp::detail::OnNextForward66     void operator()(state_t& s, T&& t) const {
67         s.on_next(t);
68     }
69 };
70 
71 template<class State, class OnError>
72 struct OnErrorForward
73 {
74     using state_t = rxu::decay_t<State>;
75     using onerror_t = rxu::decay_t<OnError>;
OnErrorForwardrxcpp::detail::OnErrorForward76     OnErrorForward() : onerror() {}
OnErrorForwardrxcpp::detail::OnErrorForward77     explicit OnErrorForward(onerror_t oe) : onerror(std::move(oe)) {}
78     onerror_t onerror;
operator ()rxcpp::detail::OnErrorForward79     void operator()(state_t& s, rxu::error_ptr ep) const {
80         onerror(s, ep);
81     }
82 };
83 template<class State>
84 struct OnErrorForward<State, void>
85 {
86     using state_t = rxu::decay_t<State>;
OnErrorForwardrxcpp::detail::OnErrorForward87     OnErrorForward() {}
operator ()rxcpp::detail::OnErrorForward88     void operator()(state_t& s, rxu::error_ptr ep) const {
89         s.on_error(ep);
90     }
91 };
92 
93 template<class State, class OnCompleted>
94 struct OnCompletedForward
95 {
96     using state_t = rxu::decay_t<State>;
97     using oncompleted_t = rxu::decay_t<OnCompleted>;
OnCompletedForwardrxcpp::detail::OnCompletedForward98     OnCompletedForward() : oncompleted() {}
OnCompletedForwardrxcpp::detail::OnCompletedForward99     explicit OnCompletedForward(oncompleted_t oc) : oncompleted(std::move(oc)) {}
100     oncompleted_t oncompleted;
operator ()rxcpp::detail::OnCompletedForward101     void operator()(state_t& s) const {
102         oncompleted(s);
103     }
104 };
105 template<class State>
106 struct OnCompletedForward<State, void>
107 {
OnCompletedForwardrxcpp::detail::OnCompletedForward108     OnCompletedForward() {}
operator ()rxcpp::detail::OnCompletedForward109     void operator()(State& s) const {
110         s.on_completed();
111     }
112 };
113 
114 template<class T, class F>
115 struct is_on_next_of
116 {
117     struct not_void {};
118     template<class CT, class CF>
119     static auto check(int) -> decltype((*(CF*)nullptr)(*(CT*)nullptr));
120     template<class CT, class CF>
121     static not_void check(...);
122 
123     typedef decltype(check<T, rxu::decay_t<F>>(0)) detail_result;
124     static const bool value = std::is_same<detail_result, void>::value;
125 };
126 
127 template<class F>
128 struct is_on_error
129 {
130     struct not_void {};
131     template<class CF>
132     static auto check(int) -> decltype((*(CF*)nullptr)(*(rxu::error_ptr*)nullptr));
133     template<class CF>
134     static not_void check(...);
135 
136     static const bool value = std::is_same<decltype(check<rxu::decay_t<F>>(0)), void>::value;
137 };
138 
139 template<class State, class F>
140 struct is_on_error_for
141 {
142     struct not_void {};
143     template<class CF>
144     static auto check(int) -> decltype((*(CF*)nullptr)(*(State*)nullptr, *(rxu::error_ptr*)nullptr));
145     template<class CF>
146     static not_void check(...);
147 
148     static const bool value = std::is_same<decltype(check<rxu::decay_t<F>>(0)), void>::value;
149 };
150 
151 template<class F>
152 struct is_on_completed
153 {
154     struct not_void {};
155     template<class CF>
156     static auto check(int) -> decltype((*(CF*)nullptr)());
157     template<class CF>
158     static not_void check(...);
159 
160     static const bool value = std::is_same<decltype(check<rxu::decay_t<F>>(0)), void>::value;
161 };
162 
163 }
164 
165 
166 /*!
167     \brief consumes values from an observable using `State` that may implement on_next, on_error and on_completed with optional overrides of each function.
168 
169     \tparam T            - the type of value in the stream
170     \tparam State        - the type of the stored state
171     \tparam OnNext       - the type of a function that matches `void(State&, T)`. Called 0 or more times. If `void` State::on_next will be called.
172     \tparam OnError      - the type of a function that matches `void(State&, rxu::error_ptr)`. Called 0 or 1 times, no further calls will be made. If `void` State::on_error will be called.
173     \tparam OnCompleted  - the type of a function that matches `void(State&)`. Called 0 or 1 times, no further calls will be made. If `void` State::on_completed will be called.
174 
175     \ingroup group-core
176 
177 */
178 template<class T, class State, class OnNext, class OnError, class OnCompleted>
179 class observer : public observer_base<T>
180 {
181 public:
182     using this_type = observer<T, State, OnNext, OnError, OnCompleted>;
183     using state_t = rxu::decay_t<State>;
184     using on_next_t = typename std::conditional<
185         !std::is_same<void, OnNext>::value,
186         rxu::decay_t<OnNext>,
187         detail::OnNextForward<T, State, OnNext>>::type;
188     using on_error_t = typename std::conditional<
189         !std::is_same<void, OnError>::value,
190         rxu::decay_t<OnError>,
191         detail::OnErrorForward<State, OnError>>::type;
192     using on_completed_t = typename std::conditional<
193         !std::is_same<void, OnCompleted>::value,
194         rxu::decay_t<OnCompleted>,
195         detail::OnCompletedForward<State, OnCompleted>>::type;
196 
197 private:
198     mutable state_t state;
199     on_next_t onnext;
200     on_error_t onerror;
201     on_completed_t oncompleted;
202 
203 public:
204 
observer(state_t s,on_next_t n=on_next_t (),on_error_t e=on_error_t (),on_completed_t c=on_completed_t ())205     explicit observer(state_t s, on_next_t n = on_next_t(), on_error_t e = on_error_t(), on_completed_t c = on_completed_t())
206         : state(std::move(s))
207         , onnext(std::move(n))
208         , onerror(std::move(e))
209         , oncompleted(std::move(c))
210     {
211     }
observer(state_t s,on_next_t n,on_completed_t c)212     explicit observer(state_t s, on_next_t n, on_completed_t c)
213         : state(std::move(s))
214         , onnext(std::move(n))
215         , onerror(on_error_t())
216         , oncompleted(std::move(c))
217     {
218     }
observer(const this_type & o)219     observer(const this_type& o)
220         : state(o.state)
221         , onnext(o.onnext)
222         , onerror(o.onerror)
223         , oncompleted(o.oncompleted)
224     {
225     }
observer(this_type && o)226     observer(this_type&& o)
227         : state(std::move(o.state))
228         , onnext(std::move(o.onnext))
229         , onerror(std::move(o.onerror))
230         , oncompleted(std::move(o.oncompleted))
231     {
232     }
operator =(this_type o)233     this_type& operator=(this_type o) {
234         state = std::move(o.state);
235         onnext = std::move(o.onnext);
236         onerror = std::move(o.onerror);
237         oncompleted = std::move(o.oncompleted);
238         return *this;
239     }
240 
on_next(T & t) const241     void on_next(T& t) const {
242         onnext(state, t);
243     }
on_next(T && t) const244     void on_next(T&& t) const {
245         onnext(state, std::move(t));
246     }
on_error(rxu::error_ptr e) const247     void on_error(rxu::error_ptr e) const {
248         onerror(state, e);
249     }
on_completed() const250     void on_completed() const {
251         oncompleted(state);
252     }
as_dynamic() const253     observer<T> as_dynamic() const {
254         return observer<T>(*this);
255     }
256 };
257 
258 /*!
259     \brief consumes values from an observable using default empty method implementations with optional overrides of each function.
260 
261     \tparam T            - the type of value in the stream
262     \tparam OnNext       - the type of a function that matches `void(T)`. Called 0 or more times. If `void` OnNextEmpty<T> is used.
263     \tparam OnError      - the type of a function that matches `void(rxu::error_ptr)`. Called 0 or 1 times, no further calls will be made. If `void` OnErrorEmpty is used.
264     \tparam OnCompleted  - the type of a function that matches `void()`. Called 0 or 1 times, no further calls will be made. If `void` OnCompletedEmpty is used.
265 
266     \ingroup group-core
267 
268 */
269 template<class T, class OnNext, class OnError, class OnCompleted>
270 class observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted> : public observer_base<T>
271 {
272 public:
273     using this_type = observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>;
274     using on_next_t = typename std::conditional<
275         !std::is_same<void, OnNext>::value,
276         rxu::decay_t<OnNext>,
277         detail::OnNextEmpty<T>>::type;
278     using on_error_t = typename std::conditional<
279         !std::is_same<void, OnError>::value,
280         rxu::decay_t<OnError>,
281         detail::OnErrorEmpty>::type;
282     using on_completed_t = typename std::conditional<
283         !std::is_same<void, OnCompleted>::value,
284         rxu::decay_t<OnCompleted>,
285         detail::OnCompletedEmpty>::type;
286 
287 private:
288     on_next_t onnext;
289     on_error_t onerror;
290     on_completed_t oncompleted;
291 
292 public:
293     static_assert(detail::is_on_next_of<T, on_next_t>::value,     "Function supplied for on_next must be a function with the signature void(T);");
294     static_assert(detail::is_on_error<on_error_t>::value,         "Function supplied for on_error must be a function with the signature void(rxu::error_ptr);");
295     static_assert(detail::is_on_completed<on_completed_t>::value, "Function supplied for on_completed must be a function with the signature void();");
296 
observer()297     observer()
298         : onnext(on_next_t())
299         , onerror(on_error_t())
300         , oncompleted(on_completed_t())
301     {
302     }
303 
observer(on_next_t n,on_error_t e=on_error_t (),on_completed_t c=on_completed_t ())304     explicit observer(on_next_t n, on_error_t e = on_error_t(), on_completed_t c = on_completed_t())
305         : onnext(std::move(n))
306         , onerror(std::move(e))
307         , oncompleted(std::move(c))
308     {
309     }
observer(const this_type & o)310     observer(const this_type& o)
311         : onnext(o.onnext)
312         , onerror(o.onerror)
313         , oncompleted(o.oncompleted)
314     {
315     }
observer(this_type && o)316     observer(this_type&& o)
317         : onnext(std::move(o.onnext))
318         , onerror(std::move(o.onerror))
319         , oncompleted(std::move(o.oncompleted))
320     {
321     }
operator =(this_type o)322     this_type& operator=(this_type o) {
323         onnext = std::move(o.onnext);
324         onerror = std::move(o.onerror);
325         oncompleted = std::move(o.oncompleted);
326         return *this;
327     }
328 
on_next(T & t) const329     void on_next(T& t) const {
330         onnext(t);
331     }
on_next(T && t) const332     void on_next(T&& t) const {
333         onnext(std::move(t));
334     }
on_error(rxu::error_ptr e) const335     void on_error(rxu::error_ptr e) const {
336         onerror(e);
337     }
on_completed() const338     void on_completed() const {
339         oncompleted();
340     }
as_dynamic() const341     observer<T> as_dynamic() const {
342         return observer<T>(*this);
343     }
344 };
345 
346 namespace detail
347 {
348 
349 template<class T>
350 struct virtual_observer : public std::enable_shared_from_this<virtual_observer<T>>
351 {
~virtual_observerrxcpp::detail::virtual_observer352     virtual ~virtual_observer() {}
on_nextrxcpp::detail::virtual_observer353     virtual void on_next(T&) const {};
on_nextrxcpp::detail::virtual_observer354     virtual void on_next(T&&) const {};
on_errorrxcpp::detail::virtual_observer355     virtual void on_error(rxu::error_ptr) const {};
on_completedrxcpp::detail::virtual_observer356     virtual void on_completed() const {};
357 };
358 
359 template<class T, class Observer>
360 struct specific_observer : public virtual_observer<T>
361 {
specific_observerrxcpp::detail::specific_observer362     explicit specific_observer(Observer o)
363         : destination(std::move(o))
364     {
365     }
366 
367     Observer destination;
on_nextrxcpp::detail::specific_observer368     virtual void on_next(T& t) const {
369         destination.on_next(t);
370     }
on_nextrxcpp::detail::specific_observer371     virtual void on_next(T&& t) const {
372         destination.on_next(std::move(t));
373     }
on_errorrxcpp::detail::specific_observer374     virtual void on_error(rxu::error_ptr e) const {
375         destination.on_error(e);
376     }
on_completedrxcpp::detail::specific_observer377     virtual void on_completed() const {
378         destination.on_completed();
379     }
380 };
381 
382 }
383 
384 /*!
385     \brief consumes values from an observable using type-forgetting (shared allocated state with virtual methods)
386 
387     \tparam T            - the type of value in the stream
388 
389     \ingroup group-core
390 
391 */
392 template<class T>
393 class observer<T, void, void, void, void> : public observer_base<T>
394 {
395 public:
396     typedef tag_dynamic_observer dynamic_observer_tag;
397 
398 private:
399     using this_type = observer<T, void, void, void, void>;
400     using base_type = observer_base<T>;
401     using virtual_observer = detail::virtual_observer<T>;
402 
403     std::shared_ptr<virtual_observer> destination;
404 
405     template<class Observer>
make_destination(Observer o)406     static auto make_destination(Observer o)
407         -> std::shared_ptr<virtual_observer> {
408         return std::make_shared<detail::specific_observer<T, Observer>>(std::move(o));
409     }
410 
411 public:
observer()412     observer()
413     {
414     }
observer(const this_type & o)415     observer(const this_type& o)
416         : destination(o.destination)
417     {
418     }
observer(this_type && o)419     observer(this_type&& o)
420         : destination(std::move(o.destination))
421     {
422     }
423 
424     template<class Observer>
observer(Observer o)425     explicit observer(Observer o)
426         : destination(make_destination(std::move(o)))
427     {
428     }
429 
operator =(this_type o)430     this_type& operator=(this_type o) {
431         destination = std::move(o.destination);
432         return *this;
433     }
434 
435     // perfect forwarding delays the copy of the value.
436     template<class V>
on_next(V && v) const437     void on_next(V&& v) const {
438         if (destination) {
439             destination->on_next(std::forward<V>(v));
440         }
441     }
on_error(rxu::error_ptr e) const442     void on_error(rxu::error_ptr e) const {
443         if (destination) {
444             destination->on_error(e);
445         }
446     }
on_completed() const447     void on_completed() const {
448         if (destination) {
449             destination->on_completed();
450         }
451     }
452 
as_dynamic() const453     observer<T> as_dynamic() const {
454         return *this;
455     }
456 };
457 
458 template<class T, class DefaultOnError = detail::OnErrorEmpty>
make_observer()459 auto make_observer()
460     ->      observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>, DefaultOnError> {
461     return  observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>, DefaultOnError>();
462 }
463 
464 template<class T, class DefaultOnError = detail::OnErrorEmpty, class U, class State, class OnNext, class OnError, class OnCompleted>
make_observer(observer<U,State,OnNext,OnError,OnCompleted> o)465 auto make_observer(observer<U, State, OnNext, OnError, OnCompleted> o)
466     ->      observer<T, State, OnNext, OnError, OnCompleted> {
467     return  observer<T, State, OnNext, OnError, OnCompleted>(std::move(o));
468 }
469 template<class T, class DefaultOnError = detail::OnErrorEmpty, class Observer>
make_observer(Observer ob)470 auto make_observer(Observer ob)
471     -> typename std::enable_if<
472         !detail::is_on_next_of<T, Observer>::value &&
473         !detail::is_on_error<Observer>::value &&
474         is_observer<Observer>::value,
475             Observer>::type {
476     return  std::move(ob);
477 }
478 template<class T, class DefaultOnError = detail::OnErrorEmpty, class Observer>
make_observer(Observer ob)479 auto make_observer(Observer ob)
480     -> typename std::enable_if<
481         !detail::is_on_next_of<T, Observer>::value &&
482         !detail::is_on_error<Observer>::value &&
483         !is_observer<Observer>::value,
484             observer<T, Observer>>::type {
485     return  observer<T, Observer>(std::move(ob));
486 }
487 template<class T, class DefaultOnError = detail::OnErrorEmpty, class OnNext>
make_observer(OnNext on)488 auto make_observer(OnNext on)
489     -> typename std::enable_if<
490         detail::is_on_next_of<T, OnNext>::value,
491             observer<T, detail::stateless_observer_tag, OnNext, DefaultOnError>>::type {
492     return  observer<T, detail::stateless_observer_tag, OnNext, DefaultOnError>(
493                         std::move(on));
494 }
495 template<class T, class DefaultOnError = detail::OnErrorEmpty, class OnError>
make_observer(OnError oe)496 auto make_observer(OnError oe)
497     -> typename std::enable_if<
498         !detail::is_on_next_of<T, OnError>::value &&
499         detail::is_on_error<OnError>::value,
500             observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>, OnError>>::type {
501     return  observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>, OnError>(
502                         detail::OnNextEmpty<T>(), std::move(oe));
503 }
504 template<class T, class DefaultOnError = detail::OnErrorEmpty, class OnNext, class OnError>
make_observer(OnNext on,OnError oe)505 auto make_observer(OnNext on, OnError oe)
506     -> typename std::enable_if<
507         detail::is_on_next_of<T, OnNext>::value &&
508         detail::is_on_error<OnError>::value,
509             observer<T, detail::stateless_observer_tag, OnNext, OnError>>::type {
510     return  observer<T, detail::stateless_observer_tag, OnNext, OnError>(
511                         std::move(on), std::move(oe));
512 }
513 template<class T, class DefaultOnError = detail::OnErrorEmpty, class OnNext, class OnCompleted>
make_observer(OnNext on,OnCompleted oc)514 auto make_observer(OnNext on, OnCompleted oc)
515     -> typename std::enable_if<
516         detail::is_on_next_of<T, OnNext>::value &&
517         detail::is_on_completed<OnCompleted>::value,
518             observer<T, detail::stateless_observer_tag, OnNext, DefaultOnError, OnCompleted>>::type {
519     return  observer<T, detail::stateless_observer_tag, OnNext, DefaultOnError, OnCompleted>(
520                         std::move(on), DefaultOnError(), std::move(oc));
521 }
522 template<class T, class DefaultOnError = detail::OnErrorEmpty, class OnNext, class OnError, class OnCompleted>
make_observer(OnNext on,OnError oe,OnCompleted oc)523 auto make_observer(OnNext on, OnError oe, OnCompleted oc)
524     -> typename std::enable_if<
525         detail::is_on_next_of<T, OnNext>::value &&
526         detail::is_on_error<OnError>::value &&
527         detail::is_on_completed<OnCompleted>::value,
528             observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>::type {
529     return  observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>(
530                         std::move(on), std::move(oe), std::move(oc));
531 }
532 
533 
534 template<class T, class State, class OnNext>
make_observer(State os,OnNext on)535 auto make_observer(State os, OnNext on)
536     -> typename std::enable_if<
537         !detail::is_on_next_of<T, State>::value &&
538         !detail::is_on_error<State>::value,
539             observer<T, State, OnNext>>::type {
540     return  observer<T, State, OnNext>(
541                         std::move(os), std::move(on));
542 }
543 template<class T, class State, class OnError>
make_observer(State os,OnError oe)544 auto make_observer(State os, OnError oe)
545     -> typename std::enable_if<
546         !detail::is_on_next_of<T, State>::value &&
547         !detail::is_on_error<State>::value &&
548         detail::is_on_error_for<State, OnError>::value,
549             observer<T, State, detail::OnNextEmpty<T>, OnError>>::type {
550     return  observer<T, State, detail::OnNextEmpty<T>, OnError>(
551                         std::move(os), detail::OnNextEmpty<T>(), std::move(oe));
552 }
553 template<class T, class State, class OnNext, class OnError>
make_observer(State os,OnNext on,OnError oe)554 auto make_observer(State os, OnNext on, OnError oe)
555     -> typename std::enable_if<
556         !detail::is_on_next_of<T, State>::value &&
557         !detail::is_on_error<State>::value &&
558         detail::is_on_error_for<State, OnError>::value,
559             observer<T, State, OnNext, OnError>>::type {
560     return  observer<T, State, OnNext, OnError>(
561                         std::move(os), std::move(on), std::move(oe));
562 }
563 template<class T, class State, class OnNext, class OnCompleted>
make_observer(State os,OnNext on,OnCompleted oc)564 auto make_observer(State os, OnNext on, OnCompleted oc)
565     -> typename std::enable_if<
566         !detail::is_on_next_of<T, State>::value &&
567         !detail::is_on_error<State>::value,
568             observer<T, State, OnNext, void, OnCompleted>>::type {
569     return  observer<T, State, OnNext, void, OnCompleted>(
570                         std::move(os), std::move(on), std::move(oc));
571 }
572 template<class T, class State, class OnNext, class OnError, class OnCompleted>
make_observer(State os,OnNext on,OnError oe,OnCompleted oc)573 auto make_observer(State os, OnNext on, OnError oe, OnCompleted oc)
574     -> typename std::enable_if<
575         !detail::is_on_next_of<T, State>::value &&
576         !detail::is_on_error<State>::value &&
577         detail::is_on_error_for<State, OnError>::value,
578             observer<T, State, OnNext, OnError, OnCompleted>>::type {
579     return  observer<T, State, OnNext, OnError, OnCompleted>(
580                         std::move(os), std::move(on), std::move(oe), std::move(oc));
581 }
582 
583 template<class T, class Observer>
make_observer_dynamic(Observer o)584 auto make_observer_dynamic(Observer o)
585     -> typename std::enable_if<
586         !detail::is_on_next_of<T, Observer>::value,
587             observer<T>>::type {
588     return  observer<T>(std::move(o));
589 }
590 template<class T, class OnNext>
make_observer_dynamic(OnNext && on)591 auto make_observer_dynamic(OnNext&& on)
592     -> typename std::enable_if<
593         detail::is_on_next_of<T, OnNext>::value,
594             observer<T>>::type {
595     return  observer<T>(
596                 make_observer<T>(std::forward<OnNext>(on)));
597 }
598 template<class T, class OnNext, class OnError>
make_observer_dynamic(OnNext && on,OnError && oe)599 auto make_observer_dynamic(OnNext&& on, OnError&& oe)
600     -> typename std::enable_if<
601         detail::is_on_next_of<T, OnNext>::value &&
602         detail::is_on_error<OnError>::value,
603             observer<T>>::type {
604     return  observer<T>(
605                 make_observer<T>(std::forward<OnNext>(on), std::forward<OnError>(oe)));
606 }
607 template<class T, class OnNext, class OnCompleted>
make_observer_dynamic(OnNext && on,OnCompleted && oc)608 auto make_observer_dynamic(OnNext&& on, OnCompleted&& oc)
609     -> typename std::enable_if<
610         detail::is_on_next_of<T, OnNext>::value &&
611         detail::is_on_completed<OnCompleted>::value,
612             observer<T>>::type {
613     return  observer<T>(
614                 make_observer<T>(std::forward<OnNext>(on), std::forward<OnCompleted>(oc)));
615 }
616 template<class T, class OnNext, class OnError, class OnCompleted>
make_observer_dynamic(OnNext && on,OnError && oe,OnCompleted && oc)617 auto make_observer_dynamic(OnNext&& on, OnError&& oe, OnCompleted&& oc)
618     -> typename std::enable_if<
619         detail::is_on_next_of<T, OnNext>::value &&
620         detail::is_on_error<OnError>::value &&
621         detail::is_on_completed<OnCompleted>::value,
622             observer<T>>::type {
623     return  observer<T>(
624                 make_observer<T>(std::forward<OnNext>(on), std::forward<OnError>(oe), std::forward<OnCompleted>(oc)));
625 }
626 
627 namespace detail {
628 
629 template<class F>
630 struct maybe_from_result
631 {
632     typedef decltype((*(F*)nullptr)()) decl_result_type;
633     typedef rxu::decay_t<decl_result_type> result_type;
634     typedef rxu::maybe<result_type> type;
635 };
636 
637 }
638 
639 template<class F, class OnError>
on_exception(const F & f,const OnError & c)640 auto on_exception(const F& f, const OnError& c)
641     ->  typename std::enable_if<detail::is_on_error<OnError>::value, typename detail::maybe_from_result<F>::type>::type {
642     typename detail::maybe_from_result<F>::type r;
643     RXCPP_TRY {
644         r.reset(f());
645     } RXCPP_CATCH(...) {
646         c(rxu::current_exception());
647     }
648     return r;
649 }
650 
651 template<class F, class Subscriber>
on_exception(const F & f,const Subscriber & s)652 auto on_exception(const F& f, const Subscriber& s)
653     ->  typename std::enable_if<is_subscriber<Subscriber>::value, typename detail::maybe_from_result<F>::type>::type {
654     typename detail::maybe_from_result<F>::type r;
655     RXCPP_TRY {
656         r.reset(f());
657     } RXCPP_CATCH(...) {
658         s.on_error(rxu::current_exception());
659     }
660     return r;
661 }
662 
663 }
664 
665 #endif
666