1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_RX_OBSERVABLE_HPP)
6 #define RXCPP_RX_OBSERVABLE_HPP
7 
8 #include "rx-includes.hpp"
9 
10 #ifdef __GNUG__
11 #define EXPLICIT_THIS this->
12 #else
13 #define EXPLICIT_THIS
14 #endif
15 
16 namespace rxcpp {
17 
18 namespace detail {
19 
20 template<class Subscriber, class T>
21 struct has_on_subscribe_for
22 {
23     struct not_void {};
24     template<class CS, class CT>
25     static auto check(int) -> decltype((*(CT*)nullptr).on_subscribe(*(CS*)nullptr));
26     template<class CS, class CT>
27     static not_void check(...);
28 
29     typedef decltype(check<rxu::decay_t<Subscriber>, T>(0)) detail_result;
30     static const bool value = std::is_same<detail_result, void>::value;
31 };
32 
33 }
34 
35 template<class T>
36 class dynamic_observable
37     : public rxs::source_base<T>
38 {
39     struct state_type
40         : public std::enable_shared_from_this<state_type>
41     {
42         typedef std::function<void(subscriber<T>)> onsubscribe_type;
43 
44         onsubscribe_type on_subscribe;
45     };
46     std::shared_ptr<state_type> state;
47 
48     template<class U>
49     friend bool operator==(const dynamic_observable<U>&, const dynamic_observable<U>&);
50 
51     template<class SO>
construct(SO && source,rxs::tag_source &&)52     void construct(SO&& source, rxs::tag_source&&) {
53         rxu::decay_t<SO> so = std::forward<SO>(source);
54         state->on_subscribe = [so](subscriber<T> o) mutable {
55             so.on_subscribe(std::move(o));
56         };
57     }
58 
59     struct tag_function {};
60     template<class F>
construct(F && f,tag_function &&)61     void construct(F&& f, tag_function&&) {
62         state->on_subscribe = std::forward<F>(f);
63     }
64 
65 public:
66 
67     typedef tag_dynamic_observable dynamic_observable_tag;
68 
dynamic_observable()69     dynamic_observable()
70     {
71     }
72 
73     template<class SOF>
dynamic_observable(SOF && sof,typename std::enable_if<!is_dynamic_observable<SOF>::value,void ** >::type=0)74     explicit dynamic_observable(SOF&& sof, typename std::enable_if<!is_dynamic_observable<SOF>::value, void**>::type = 0)
75         : state(std::make_shared<state_type>())
76     {
77         construct(std::forward<SOF>(sof),
78                   typename std::conditional<rxs::is_source<SOF>::value || rxo::is_operator<SOF>::value, rxs::tag_source, tag_function>::type());
79     }
80 
on_subscribe(subscriber<T> o) const81     void on_subscribe(subscriber<T> o) const {
82         state->on_subscribe(std::move(o));
83     }
84 
85     template<class Subscriber>
86     typename std::enable_if<is_subscriber<Subscriber>::value, void>::type
on_subscribe(Subscriber o) const87     on_subscribe(Subscriber o) const {
88         state->on_subscribe(o.as_dynamic());
89     }
90 };
91 
92 template<class T>
operator ==(const dynamic_observable<T> & lhs,const dynamic_observable<T> & rhs)93 inline bool operator==(const dynamic_observable<T>& lhs, const dynamic_observable<T>& rhs) {
94     return lhs.state == rhs.state;
95 }
96 template<class T>
operator !=(const dynamic_observable<T> & lhs,const dynamic_observable<T> & rhs)97 inline bool operator!=(const dynamic_observable<T>& lhs, const dynamic_observable<T>& rhs) {
98     return !(lhs == rhs);
99 }
100 
101 template<class T, class Source>
make_observable_dynamic(Source && s)102 observable<T> make_observable_dynamic(Source&& s) {
103     return observable<T>(dynamic_observable<T>(std::forward<Source>(s)));
104 }
105 
106 namespace detail {
107 template<bool Selector, class Default, class SO>
108 struct resolve_observable;
109 
110 template<class Default, class SO>
111 struct resolve_observable<true, Default, SO>
112 {
113     typedef typename SO::type type;
114     typedef typename type::value_type value_type;
115     static const bool value = true;
116     typedef observable<value_type, type> observable_type;
117     template<class... AN>
makerxcpp::detail::resolve_observable118     static observable_type make(const Default&, AN&&... an) {
119         return observable_type(type(std::forward<AN>(an)...));
120     }
121 };
122 template<class Default, class SO>
123 struct resolve_observable<false, Default, SO>
124 {
125     static const bool value = false;
126     typedef Default observable_type;
127     template<class... AN>
makerxcpp::detail::resolve_observable128     static observable_type make(const observable_type& that, const AN&...) {
129         return that;
130     }
131 };
132 template<class SO>
133 struct resolve_observable<true, void, SO>
134 {
135     typedef typename SO::type type;
136     typedef typename type::value_type value_type;
137     static const bool value = true;
138     typedef observable<value_type, type> observable_type;
139     template<class... AN>
makerxcpp::detail::resolve_observable140     static observable_type make(AN&&... an) {
141         return observable_type(type(std::forward<AN>(an)...));
142     }
143 };
144 template<class SO>
145 struct resolve_observable<false, void, SO>
146 {
147     static const bool value = false;
148     typedef void observable_type;
149     template<class... AN>
makerxcpp::detail::resolve_observable150     static observable_type make(const AN&...) {
151     }
152 };
153 
154 }
155 
156 template<class Selector, class Default, template<class... TN> class SO, class... AN>
157 struct defer_observable
158     : public detail::resolve_observable<Selector::value, Default, rxu::defer_type<SO, AN...>>
159 {
160 };
161 
162 /*!
163     \brief a source of values whose methods block until all values have been emitted. subscribe or use one of the operator methods that reduce the values emitted to a single value.
164 
165     \ingroup group-observable
166 
167 */
168 template<class T, class Observable>
169 class blocking_observable
170 {
171     template<class Obsvbl, class... ArgN>
blocking_subscribe(const Obsvbl & source,bool do_rethrow,ArgN &&...an)172     static auto blocking_subscribe(const Obsvbl& source, bool do_rethrow, ArgN&&... an)
173         -> void {
174         std::mutex lock;
175         std::condition_variable wake;
176         bool disposed = false;
177         rxu::error_ptr error;
178 
179         auto dest = make_subscriber<T>(std::forward<ArgN>(an)...);
180 
181         // keep any error to rethrow at the end.
182         auto scbr = make_subscriber<T>(
183             dest,
184             [&](T t){dest.on_next(t);},
185             [&](rxu::error_ptr e){
186                 if (do_rethrow) {
187                     error = e;
188                 } else {
189                     dest.on_error(e);
190                 }
191             },
192             [&](){dest.on_completed();}
193             );
194 
195         auto cs = scbr.get_subscription();
196         cs.add(
197             [&](){
198                 std::unique_lock<std::mutex> guard(lock);
199                 wake.notify_one();
200                 disposed = true;
201             });
202 
203         source.subscribe(std::move(scbr));
204 
205         std::unique_lock<std::mutex> guard(lock);
206         wake.wait(guard,
207             [&](){
208                 return disposed;
209             });
210 
211         if (error) {rxu::rethrow_exception(error);}
212     }
213 
214 public:
215     typedef rxu::decay_t<Observable> observable_type;
216     observable_type source;
~blocking_observable()217     ~blocking_observable()
218     {
219     }
blocking_observable(observable_type s)220     blocking_observable(observable_type s) : source(std::move(s)) {}
221 
222     ///
223     /// `subscribe` will cause this observable to emit values to the provided subscriber.
224     ///
225     /// \return void
226     ///
227     /// \param an... - the arguments are passed to make_subscriber().
228     ///
229     /// callers must provide enough arguments to make a subscriber.
230     /// overrides are supported. thus
231     ///   `subscribe(thesubscriber, composite_subscription())`
232     /// will take `thesubscriber.get_observer()` and the provided
233     /// subscription and subscribe to the new subscriber.
234     /// the `on_next`, `on_error`, `on_completed` methods can be supplied instead of an observer
235     /// if a subscription or subscriber is not provided then a new subscription will be created.
236     ///
237     template<class... ArgN>
subscribe(ArgN &&...an) const238     auto subscribe(ArgN&&... an) const
239         -> void {
240         return blocking_subscribe(source, false, std::forward<ArgN>(an)...);
241     }
242 
243     ///
244     /// `subscribe_with_rethrow` will cause this observable to emit values to the provided subscriber.
245     ///
246     /// \note  If the source observable calls on_error, the raised exception is rethrown by this method.
247     ///
248     /// \note  If the source observable calls on_error, the `on_error` method on the subscriber will not be called.
249     ///
250     /// \return void
251     ///
252     /// \param an... - the arguments are passed to make_subscriber().
253     ///
254     /// callers must provide enough arguments to make a subscriber.
255     /// overrides are supported. thus
256     ///   `subscribe(thesubscriber, composite_subscription())`
257     /// will take `thesubscriber.get_observer()` and the provided
258     /// subscription and subscribe to the new subscriber.
259     /// the `on_next`, `on_error`, `on_completed` methods can be supplied instead of an observer
260     /// if a subscription or subscriber is not provided then a new subscription will be created.
261     ///
262     template<class... ArgN>
subscribe_with_rethrow(ArgN &&...an) const263     auto subscribe_with_rethrow(ArgN&&... an) const
264         -> void {
265         return blocking_subscribe(source, true, std::forward<ArgN>(an)...);
266     }
267 
268     /*! Return the first item emitted by this blocking_observable, or throw an std::runtime_error exception if it emits no items.
269 
270         \return  The first item emitted by this blocking_observable.
271 
272         \note  If the source observable calls on_error, the raised exception is rethrown by this method.
273 
274         \sample
275         When the source observable emits at least one item:
276         \snippet blocking_observable.cpp blocking first sample
277         \snippet output.txt blocking first sample
278 
279         When the source observable is empty:
280         \snippet blocking_observable.cpp blocking first empty sample
281         \snippet output.txt blocking first empty sample
282     */
283     template<class... AN>
first(AN ** ...)284     auto first(AN**...) -> delayed_type_t<T, AN...> const {
285         rxu::maybe<T> result;
286         composite_subscription cs;
287         subscribe_with_rethrow(
288             cs,
289             [&](T v){result.reset(v); cs.unsubscribe();});
290         if (result.empty())
291             rxu::throw_exception(rxcpp::empty_error("first() requires a stream with at least one value"));
292         return result.get();
293         static_assert(sizeof...(AN) == 0, "first() was passed too many arguments.");
294     }
295 
296     /*! Return the last item emitted by this blocking_observable, or throw an std::runtime_error exception if it emits no items.
297 
298         \return  The last item emitted by this blocking_observable.
299 
300         \note  If the source observable calls on_error, the raised exception is rethrown by this method.
301 
302         \sample
303         When the source observable emits at least one item:
304         \snippet blocking_observable.cpp blocking last sample
305         \snippet output.txt blocking last sample
306 
307         When the source observable is empty:
308         \snippet blocking_observable.cpp blocking last empty sample
309         \snippet output.txt blocking last empty sample
310     */
311     template<class... AN>
last(AN ** ...)312     auto last(AN**...) -> delayed_type_t<T, AN...> const {
313         rxu::maybe<T> result;
314         subscribe_with_rethrow(
315             [&](T v){result.reset(v);});
316         if (result.empty())
317             rxu::throw_exception(rxcpp::empty_error("last() requires a stream with at least one value"));
318         return result.get();
319         static_assert(sizeof...(AN) == 0, "last() was passed too many arguments.");
320     }
321 
322     /*! Return the total number of items emitted by this blocking_observable.
323 
324         \return  The total number of items emitted by this blocking_observable.
325 
326         \sample
327         \snippet blocking_observable.cpp blocking count sample
328         \snippet output.txt blocking count sample
329 
330         When the source observable calls on_error:
331         \snippet blocking_observable.cpp blocking count error sample
332         \snippet output.txt blocking count error sample
333     */
count() const334     int count() const {
335         int result = 0;
336         source.count().as_blocking().subscribe_with_rethrow(
337             [&](int v){result = v;});
338         return result;
339     }
340 
341     /*! Return the sum of all items emitted by this blocking_observable, or throw an std::runtime_error exception if it emits no items.
342 
343         \return  The sum of all items emitted by this blocking_observable.
344 
345         \sample
346         When the source observable emits at least one item:
347         \snippet blocking_observable.cpp blocking sum sample
348         \snippet output.txt blocking sum sample
349 
350         When the source observable is empty:
351         \snippet blocking_observable.cpp blocking sum empty sample
352         \snippet output.txt blocking sum empty sample
353 
354         When the source observable calls on_error:
355         \snippet blocking_observable.cpp blocking sum error sample
356         \snippet output.txt blocking sum error sample
357     */
sum() const358     T sum() const {
359         return source.sum().as_blocking().last();
360     }
361 
362     /*! Return the average value of all items emitted by this blocking_observable, or throw an std::runtime_error exception if it emits no items.
363 
364         \return  The average value of all items emitted by this blocking_observable.
365 
366         \sample
367         When the source observable emits at least one item:
368         \snippet blocking_observable.cpp blocking average sample
369         \snippet output.txt blocking average sample
370 
371         When the source observable is empty:
372         \snippet blocking_observable.cpp blocking average empty sample
373         \snippet output.txt blocking average empty sample
374 
375         When the source observable calls on_error:
376         \snippet blocking_observable.cpp blocking average error sample
377         \snippet output.txt blocking average error sample
378     */
average() const379     double average() const {
380         return source.average().as_blocking().last();
381     }
382 
383     /*! Return the max of all items emitted by this blocking_observable, or throw an std::runtime_error exception if it emits no items.
384 
385     \return  The max of all items emitted by this blocking_observable.
386 
387     \sample
388     When the source observable emits at least one item:
389     \snippet blocking_observable.cpp blocking max sample
390     \snippet output.txt blocking max sample
391 
392     When the source observable is empty:
393     \snippet blocking_observable.cpp blocking max empty sample
394     \snippet output.txt blocking max empty sample
395 
396     When the source observable calls on_error:
397     \snippet blocking_observable.cpp blocking max error sample
398     \snippet output.txt blocking max error sample
399 */
max() const400     T max() const {
401         return source.max().as_blocking().last();
402     }
403 
404     /*! Return the min of all items emitted by this blocking_observable, or throw an std::runtime_error exception if it emits no items.
405 
406     \return  The min of all items emitted by this blocking_observable.
407 
408     \sample
409     When the source observable emits at least one item:
410     \snippet blocking_observable.cpp blocking min sample
411     \snippet output.txt blocking min sample
412 
413     When the source observable is empty:
414     \snippet blocking_observable.cpp blocking min empty sample
415     \snippet output.txt blocking min empty sample
416 
417     When the source observable calls on_error:
418     \snippet blocking_observable.cpp blocking min error sample
419     \snippet output.txt blocking min error sample
420 */
min() const421     T min() const {
422         return source.min().as_blocking().last();
423     }
424 };
425 
426 namespace detail {
427 
428 template<class SourceOperator, class Subscriber>
429 struct safe_subscriber
430 {
safe_subscriberrxcpp::detail::safe_subscriber431     safe_subscriber(SourceOperator& so, Subscriber& o) : so(std::addressof(so)), o(std::addressof(o)) {}
432 
subscriberxcpp::detail::safe_subscriber433     void subscribe() {
434         RXCPP_TRY {
435             so->on_subscribe(*o);
436         } RXCPP_CATCH(...) {
437             if (!o->is_subscribed()) {
438               rxu::rethrow_current_exception();
439             }
440             o->on_error(rxu::make_error_ptr(rxu::current_exception()));
441             o->unsubscribe();
442         }
443     }
444 
operator ()rxcpp::detail::safe_subscriber445     void operator()(const rxsc::schedulable&) {
446         subscribe();
447     }
448 
449     SourceOperator* so;
450     Subscriber* o;
451 };
452 
453 }
454 
455 template<>
456 class observable<void, void>;
457 
458 /*!
459     \defgroup group-observable Observables
460 
461     \brief These are the set of observable classes in rxcpp.
462 
463     \class rxcpp::observable
464 
465     \ingroup group-observable group-core
466 
467     \brief a source of values. subscribe or use one of the operator methods that return a new observable, which uses this observable as a source.
468 
469     \par Some code
470     This sample will observable::subscribe() to values from a observable<void, void>::range().
471 
472     \sample
473     \snippet range.cpp range sample
474     \snippet output.txt range sample
475 
476 */
477 template<class T, class SourceOperator>
478 class observable
479     : public observable_base<T>
480 {
481     static_assert(std::is_same<T, typename SourceOperator::value_type>::value, "SourceOperator::value_type must be the same as T in observable<T, SourceOperator>");
482 
483     typedef observable<T, SourceOperator> this_type;
484 
485 public:
486     typedef rxu::decay_t<SourceOperator> source_operator_type;
487     mutable source_operator_type source_operator;
488 
489 private:
490 
491     template<class U, class SO>
492     friend class observable;
493 
494     template<class U, class SO>
495     friend bool operator==(const observable<U, SO>&, const observable<U, SO>&);
496 
497     template<class Subscriber>
detail_subscribe(Subscriber o) const498     auto detail_subscribe(Subscriber o) const
499         -> composite_subscription {
500 
501         typedef rxu::decay_t<Subscriber> subscriber_type;
502 
503         static_assert(is_subscriber<subscriber_type>::value, "subscribe must be passed a subscriber");
504         static_assert(std::is_same<typename source_operator_type::value_type, T>::value && std::is_convertible<T*, typename subscriber_type::value_type*>::value, "the value types in the sequence must match or be convertible");
505         static_assert(detail::has_on_subscribe_for<subscriber_type, source_operator_type>::value, "inner must have on_subscribe method that accepts this subscriber ");
506 
507         trace_activity().subscribe_enter(*this, o);
508 
509         if (!o.is_subscribed()) {
510             trace_activity().subscribe_return(*this);
511             return o.get_subscription();
512         }
513 
514         detail::safe_subscriber<source_operator_type, subscriber_type> subscriber(source_operator, o);
515 
516         // make sure to let current_thread take ownership of the thread as early as possible.
517         if (rxsc::current_thread::is_schedule_required()) {
518             const auto& sc = rxsc::make_current_thread();
519             sc.create_worker(o.get_subscription()).schedule(subscriber);
520         } else {
521             // current_thread already owns this thread.
522             subscriber.subscribe();
523         }
524 
525         trace_activity().subscribe_return(*this);
526         return o.get_subscription();
527     }
528 
529 public:
530     typedef T value_type;
531 
532     static_assert(rxo::is_operator<source_operator_type>::value || rxs::is_source<source_operator_type>::value, "observable must wrap an operator or source");
533 
~observable()534     ~observable()
535     {
536     }
537 
observable()538     observable()
539     {
540     }
541 
observable(const source_operator_type & o)542     explicit observable(const source_operator_type& o)
543         : source_operator(o)
544     {
545     }
observable(source_operator_type && o)546     explicit observable(source_operator_type&& o)
547         : source_operator(std::move(o))
548     {
549     }
550 
551     /// implicit conversion between observables of the same value_type
552     template<class SO>
observable(const observable<T,SO> & o)553     observable(const observable<T, SO>& o)
554         : source_operator(o.source_operator)
555     {}
556     /// implicit conversion between observables of the same value_type
557     template<class SO>
observable(observable<T,SO> && o)558     observable(observable<T, SO>&& o)
559         : source_operator(std::move(o.source_operator))
560     {}
561 
562 #if 0
563     template<class I>
564     void on_subscribe(observer<T, I> o) const {
565         source_operator.on_subscribe(o);
566     }
567 #endif
568 
569     /*! @copydoc rxcpp::operators::as_dynamic
570      */
571     template<class... AN>
as_dynamic(AN ** ...) const572     observable<T> as_dynamic(AN**...) const {
573         return *this;
574         static_assert(sizeof...(AN) == 0, "as_dynamic() was passed too many arguments.");
575     }
576 
577     /*! @copydoc rx-ref_count.hpp
578      */
579     template<class... AN>
ref_count(AN...an) const580     auto ref_count(AN... an) const // ref_count(ConnectableObservable&&)
581         /// \cond SHOW_SERVICE_MEMBERS
582         -> decltype(observable_member(ref_count_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
583         /// \endcond
584     {
585         return      observable_member(ref_count_tag{},                *this, std::forward<AN>(an)...);
586     }
587 
588     /*! @copydoc rxcpp::operators::as_blocking
589      */
590     template<class... AN>
as_blocking(AN ** ...) const591     blocking_observable<T, this_type> as_blocking(AN**...) const {
592         return blocking_observable<T, this_type>(*this);
593         static_assert(sizeof...(AN) == 0, "as_blocking() was passed too many arguments.");
594     }
595 
596     /// \cond SHOW_SERVICE_MEMBERS
597 
598     ///
599     /// takes any function that will take this observable and produce a result value.
600     /// this is intended to allow externally defined operators, that use subscribe,
601     /// to be connected into the expression.
602     ///
603     template<class OperatorFactory>
op(OperatorFactory && of) const604     auto op(OperatorFactory&& of) const
605         -> decltype(of(*(const this_type*)nullptr)) {
606         return      of(*this);
607         static_assert(is_operator_factory_for<this_type, OperatorFactory>::value, "Function passed for op() must have the signature Result(SourceObservable)");
608     }
609 
610     /*! @copydoc rx-lift.hpp
611      */
612     template<class ResultType, class Operator>
lift(Operator && op) const613     auto lift(Operator&& op) const
614         ->      observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>> {
615         return  observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>(
616                                                                                                                       rxo::detail::lift_operator<ResultType, source_operator_type, Operator>(source_operator, std::forward<Operator>(op)));
617         static_assert(detail::is_lift_function_for<T, subscriber<ResultType>, Operator>::value, "Function passed for lift() must have the signature subscriber<...>(subscriber<T, ...>)");
618     }
619 
620     ///
621     /// takes any function that will take a subscriber for this observable and produce a subscriber.
622     /// this is intended to allow externally defined operators, that use make_subscriber, to be connected
623     /// into the expression.
624     ///
625     template<class ResultType, class Operator>
lift_if(Operator && op) const626     auto lift_if(Operator&& op) const
627         -> typename std::enable_if<detail::is_lift_function_for<T, subscriber<ResultType>, Operator>::value,
628             observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>>::type {
629         return  observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>(
630                                                                                                                       rxo::detail::lift_operator<ResultType, source_operator_type, Operator>(source_operator, std::forward<Operator>(op)));
631     }
632     ///
633     /// takes any function that will take a subscriber for this observable and produce a subscriber.
634     /// this is intended to allow externally defined operators, that use make_subscriber, to be connected
635     /// into the expression.
636     ///
637     template<class ResultType, class Operator>
lift_if(Operator &&) const638     auto lift_if(Operator&&) const
639         -> typename std::enable_if<!detail::is_lift_function_for<T, subscriber<ResultType>, Operator>::value,
640             decltype(rxs::from<ResultType>())>::type {
641         return       rxs::from<ResultType>();
642     }
643     /// \endcond
644 
645     /*! @copydoc rx-subscribe.hpp
646      */
647     template<class... ArgN>
subscribe(ArgN &&...an) const648     auto subscribe(ArgN&&... an) const
649         -> composite_subscription {
650         return detail_subscribe(make_subscriber<T>(std::forward<ArgN>(an)...));
651     }
652 
653     /*! @copydoc rx-all.hpp
654      */
655     template<class... AN>
all(AN &&...an) const656     auto all(AN&&... an) const
657     /// \cond SHOW_SERVICE_MEMBERS
658     -> decltype(observable_member(all_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
659     /// \endcond
660     {
661         return  observable_member(all_tag{},                *this, std::forward<AN>(an)...);
662     }
663 
664     /*! @copydoc rxcpp::operators::is_empty
665      */
666     template<class... AN>
is_empty(AN &&...an) const667     auto is_empty(AN&&... an) const
668     /// \cond SHOW_SERVICE_MEMBERS
669     -> decltype(observable_member(is_empty_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
670     /// \endcond
671     {
672         return  observable_member(is_empty_tag{},                *this, std::forward<AN>(an)...);
673     }
674 
675     /*! @copydoc rx-any.hpp
676      */
677     template<class... AN>
any(AN &&...an) const678     auto any(AN&&... an) const
679     /// \cond SHOW_SERVICE_MEMBERS
680     -> decltype(observable_member(any_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
681     /// \endcond
682     {
683         return  observable_member(any_tag{},                *this, std::forward<AN>(an)...);
684     }
685 
686     /*! @copydoc rxcpp::operators::exists
687      */
688     template<class... AN>
exists(AN &&...an) const689     auto exists(AN&&... an) const
690     /// \cond SHOW_SERVICE_MEMBERS
691     -> decltype(observable_member(exists_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
692     /// \endcond
693     {
694         return  observable_member(exists_tag{},                *this, std::forward<AN>(an)...);
695     }
696 
697     /*! @copydoc rxcpp::operators::contains
698      */
699     template<class... AN>
contains(AN &&...an) const700     auto contains(AN&&... an) const
701     /// \cond SHOW_SERVICE_MEMBERS
702     -> decltype(observable_member(contains_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
703     /// \endcond
704     {
705         return  observable_member(contains_tag{},                *this, std::forward<AN>(an)...);
706     }
707 
708     /*! @copydoc rx-filter.hpp
709      */
710     template<class... AN>
filter(AN &&...an) const711     auto filter(AN&&... an) const
712     /// \cond SHOW_SERVICE_MEMBERS
713     -> decltype(observable_member(filter_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
714     /// \endcond
715     {
716         return  observable_member(filter_tag{}, *this,                std::forward<AN>(an)...);
717     }
718 
719     /*! @copydoc rx-switch_if_empty.hpp
720     */
721     template<class... AN>
switch_if_empty(AN &&...an) const722     auto switch_if_empty(AN&&... an) const
723         /// \cond SHOW_SERVICE_MEMBERS
724         -> decltype(observable_member(switch_if_empty_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
725         /// \endcond
726     {
727         return      observable_member(switch_if_empty_tag{},                *this, std::forward<AN>(an)...);
728     }
729 
730     /*! @copydoc rxcpp::operators::default_if_empty
731     */
732     template<class... AN>
default_if_empty(AN &&...an) const733     auto default_if_empty(AN&&... an) const
734         /// \cond SHOW_SERVICE_MEMBERS
735         -> decltype(observable_member(default_if_empty_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
736         /// \endcond
737     {
738         return      observable_member(default_if_empty_tag{},                *this, std::forward<AN>(an)...);
739     }
740 
741     /*! @copydoc rx-sequence_equal.hpp
742      */
743     template<class... AN>
sequence_equal(AN...an) const744     auto sequence_equal(AN... an) const
745         /// \cond SHOW_SERVICE_MEMBERS
746         -> decltype(observable_member(sequence_equal_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
747         /// \endcond
748     {
749         return      observable_member(sequence_equal_tag{},                *this, std::forward<AN>(an)...);
750     }
751 
752     /*! @copydoc rx-tap.hpp
753      */
754     template<class... AN>
tap(AN &&...an) const755     auto tap(AN&&... an) const
756         /// \cond SHOW_SERVICE_MEMBERS
757         -> decltype(observable_member(tap_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
758         /// \endcond
759     {
760         return      observable_member(tap_tag{},                *this, std::forward<AN>(an)...);
761     }
762 
763     /*! @copydoc rx-time_interval.hpp
764      */
765     template<class... AN>
time_interval(AN &&...an) const766     auto time_interval(AN&&... an) const
767     /// \cond SHOW_SERVICE_MEMBERS
768     -> decltype(observable_member(time_interval_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
769     /// \endcond
770     {
771         return  observable_member(time_interval_tag{},                *this, std::forward<AN>(an)...);
772     }
773 
774     /*! @copydoc rx-timeout.hpp
775      */
776     template<class... AN>
timeout(AN &&...an) const777     auto timeout(AN&&... an) const
778     /// \cond SHOW_SERVICE_MEMBERS
779     -> decltype(observable_member(timeout_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
780     /// \endcond
781     {
782         return  observable_member(timeout_tag{},                *this, std::forward<AN>(an)...);
783     }
784 
785     /*! @copydoc rx-timestamp.hpp
786      */
787     template<class... AN>
timestamp(AN &&...an) const788     auto timestamp(AN&&... an) const
789     /// \cond SHOW_SERVICE_MEMBERS
790     -> decltype(observable_member(timestamp_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
791     /// \endcond
792     {
793         return  observable_member(timestamp_tag{},                *this, std::forward<AN>(an)...);
794     }
795 
796     /*! @copydoc rx-finally.hpp
797      */
798     template<class... AN>
finally(AN &&...an) const799     auto finally(AN&&... an) const
800         /// \cond SHOW_SERVICE_MEMBERS
801         -> decltype(observable_member(finally_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
802         /// \endcond
803     {
804         return      observable_member(finally_tag{},                *this, std::forward<AN>(an)...);
805     }
806 
807     /*! @copydoc rx-on_error_resume_next.hpp
808     */
809     template<class... AN>
on_error_resume_next(AN &&...an) const810     auto on_error_resume_next(AN&&... an) const
811     /// \cond SHOW_SERVICE_MEMBERS
812     -> decltype(observable_member(on_error_resume_next_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
813     /// \endcond
814     {
815         return  observable_member(on_error_resume_next_tag{},                *this, std::forward<AN>(an)...);
816     }
817 
818     /*! @copydoc rx-on_error_resume_next.hpp
819     */
820     template<class... AN>
switch_on_error(AN &&...an) const821     auto switch_on_error(AN&&... an) const
822     /// \cond SHOW_SERVICE_MEMBERS
823     -> decltype(observable_member(on_error_resume_next_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
824     /// \endcond
825     {
826         return  observable_member(on_error_resume_next_tag{},                *this, std::forward<AN>(an)...);
827     }
828 
829     /*! @copydoc rx-map.hpp
830      */
831     template<class... AN>
map(AN &&...an) const832     auto map(AN&&... an) const
833     /// \cond SHOW_SERVICE_MEMBERS
834     -> decltype(observable_member(map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
835     /// \endcond
836     {
837         return  observable_member(map_tag{},                *this, std::forward<AN>(an)...);
838     }
839 
840     /*! @copydoc rx-map.hpp
841      */
842     template<class... AN>
transform(AN &&...an) const843     auto transform(AN&&... an) const
844     /// \cond SHOW_SERVICE_MEMBERS
845     -> decltype(observable_member(map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
846     /// \endcond
847     {
848         return  observable_member(map_tag{},                *this, std::forward<AN>(an)...);
849     }
850 
851     /*! @copydoc rx-debounce.hpp
852      */
853     template<class... AN>
debounce(AN &&...an) const854     auto debounce(AN&&... an) const
855     /// \cond SHOW_SERVICE_MEMBERS
856     -> decltype(observable_member(debounce_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
857     /// \endcond
858     {
859         return  observable_member(debounce_tag{},                *this, std::forward<AN>(an)...);
860     }
861 
862     /*! @copydoc rx-delay.hpp
863      */
864     template<class... AN>
delay(AN &&...an) const865     auto delay(AN&&... an) const
866     /// \cond SHOW_SERVICE_MEMBERS
867     -> decltype(observable_member(delay_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
868     /// \endcond
869     {
870         return  observable_member(delay_tag{},                *this, std::forward<AN>(an)...);
871     }
872 
873     /*! @copydoc rx-distinct.hpp
874      */
875     template<class... AN>
distinct(AN &&...an) const876     auto distinct(AN&&... an) const
877     /// \cond SHOW_SERVICE_MEMBERS
878     -> decltype(observable_member(distinct_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
879     /// \endcond
880     {
881         return  observable_member(distinct_tag{},                *this, std::forward<AN>(an)...);
882     }
883 
884     /*! @copydoc rx-distinct_until_changed.hpp
885      */
886     template<class... AN>
distinct_until_changed(AN &&...an) const887     auto distinct_until_changed(AN&&... an) const
888     /// \cond SHOW_SERVICE_MEMBERS
889     -> decltype(observable_member(distinct_until_changed_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
890     /// \endcond
891     {
892         return  observable_member(distinct_until_changed_tag{},                *this, std::forward<AN>(an)...);
893     }
894 
895     /*! @copydoc rx-element_at.hpp
896      */
897     template<class... AN>
element_at(AN &&...an) const898     auto element_at(AN&&... an) const
899     /// \cond SHOW_SERVICE_MEMBERS
900     -> decltype(observable_member(element_at_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
901     /// \endcond
902     {
903         return  observable_member(element_at_tag{},                *this, std::forward<AN>(an)...);
904     }
905 
906     /*! @copydoc rx-window.hpp
907     */
908     template<class... AN>
window(AN &&...an) const909     auto window(AN&&... an) const
910     /// \cond SHOW_SERVICE_MEMBERS
911     -> decltype(observable_member(window_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
912     /// \endcond
913     {
914         return  observable_member(window_tag{},                *this, std::forward<AN>(an)...);
915     }
916 
917     /*! @copydoc rx-window_time.hpp
918     */
919     template<class... AN>
window_with_time(AN &&...an) const920     auto window_with_time(AN&&... an) const
921     /// \cond SHOW_SERVICE_MEMBERS
922     -> decltype(observable_member(window_with_time_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
923     /// \endcond
924     {
925         return  observable_member(window_with_time_tag{},                *this, std::forward<AN>(an)...);
926     }
927 
928     /*! @copydoc rx-window_time_count.hpp
929     */
930     template<class... AN>
window_with_time_or_count(AN &&...an) const931     auto window_with_time_or_count(AN&&... an) const
932     /// \cond SHOW_SERVICE_MEMBERS
933     -> decltype(observable_member(window_with_time_or_count_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
934     /// \endcond
935     {
936         return  observable_member(window_with_time_or_count_tag{},                *this, std::forward<AN>(an)...);
937     }
938 
939     /*! @copydoc rx-window_toggle.hpp
940     */
941     template<class... AN>
window_toggle(AN &&...an) const942     auto window_toggle(AN&&... an) const
943     /// \cond SHOW_SERVICE_MEMBERS
944     -> decltype(observable_member(window_toggle_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
945     /// \endcond
946     {
947         return  observable_member(window_toggle_tag{},                *this, std::forward<AN>(an)...);
948     }
949 
950     /*! @copydoc rx-buffer_count.hpp
951      */
952     template<class... AN>
buffer(AN &&...an) const953     auto buffer(AN&&... an) const
954     /// \cond SHOW_SERVICE_MEMBERS
955     -> decltype(observable_member(buffer_count_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
956     /// \endcond
957     {
958         return  observable_member(buffer_count_tag{},                *this, std::forward<AN>(an)...);
959     }
960 
961     /*! @copydoc rx-buffer_time.hpp
962      */
963     template<class... AN>
buffer_with_time(AN &&...an) const964     auto buffer_with_time(AN&&... an) const
965     /// \cond SHOW_SERVICE_MEMBERS
966     -> decltype(observable_member(buffer_with_time_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
967     /// \endcond
968     {
969         return  observable_member(buffer_with_time_tag{},                *this, std::forward<AN>(an)...);
970     }
971 
972     /*! @copydoc rx-buffer_time_count.hpp
973      */
974     template<class... AN>
buffer_with_time_or_count(AN &&...an) const975     auto buffer_with_time_or_count(AN&&... an) const
976     /// \cond SHOW_SERVICE_MEMBERS
977     -> decltype(observable_member(buffer_with_time_or_count_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
978     /// \endcond
979     {
980         return  observable_member(buffer_with_time_or_count_tag{},                *this, std::forward<AN>(an)...);
981     }
982 
983     /*! @copydoc rx-switch_on_next.hpp
984     */
985     template<class... AN>
switch_on_next(AN &&...an) const986     auto switch_on_next(AN&&... an) const
987         /// \cond SHOW_SERVICE_MEMBERS
988         -> decltype(observable_member(switch_on_next_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
989         /// \endcond
990     {
991         return      observable_member(switch_on_next_tag{},                *this, std::forward<AN>(an)...);
992     }
993 
994     /*! @copydoc rx-merge.hpp
995      */
996     template<class... AN>
merge(AN...an) const997     auto merge(AN... an) const
998         /// \cond SHOW_SERVICE_MEMBERS
999         -> decltype(observable_member(merge_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1000         /// \endcond
1001     {
1002         return      observable_member(merge_tag{},                *this, std::forward<AN>(an)...);
1003     }
1004 
1005     /*! @copydoc rx-merge_delay_error.hpp
1006      */
1007     template<class... AN>
merge_delay_error(AN...an) const1008     auto merge_delay_error(AN... an) const
1009         /// \cond SHOW_SERVICE_MEMBERS
1010         -> decltype(observable_member(merge_delay_error_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1011         /// \endcond
1012     {
1013             return      observable_member(merge_delay_error_tag{},                *this, std::forward<AN>(an)...);
1014     }
1015 
1016     /*! @copydoc rx-amb.hpp
1017      */
1018     template<class... AN>
amb(AN...an) const1019     auto amb(AN... an) const
1020         /// \cond SHOW_SERVICE_MEMBERS
1021         -> decltype(observable_member(amb_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1022         /// \endcond
1023     {
1024         return      observable_member(amb_tag{},                *this, std::forward<AN>(an)...);
1025     }
1026 
1027     /*! @copydoc rx-flat_map.hpp
1028      */
1029     template<class... AN>
flat_map(AN &&...an) const1030     auto flat_map(AN&&... an) const
1031         /// \cond SHOW_SERVICE_MEMBERS
1032         -> decltype(observable_member(flat_map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1033         /// \endcond
1034     {
1035         return      observable_member(flat_map_tag{},                *this, std::forward<AN>(an)...);
1036     }
1037 
1038     /*! @copydoc rx-flat_map.hpp
1039      */
1040     template<class... AN>
merge_transform(AN &&...an) const1041     auto merge_transform(AN&&... an) const
1042         /// \cond SHOW_SERVICE_MEMBERS
1043         -> decltype(observable_member(flat_map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1044         /// \endcond
1045     {
1046         return      observable_member(flat_map_tag{},                *this, std::forward<AN>(an)...);
1047     }
1048 
1049     /*! @copydoc rx-concat.hpp
1050      */
1051     template<class... AN>
concat(AN...an) const1052     auto concat(AN... an) const
1053         /// \cond SHOW_SERVICE_MEMBERS
1054         -> decltype(observable_member(concat_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1055         /// \endcond
1056     {
1057         return      observable_member(concat_tag{},                *this, std::forward<AN>(an)...);
1058     }
1059 
1060     /*! @copydoc rx-concat_map.hpp
1061      */
1062     template<class... AN>
concat_map(AN &&...an) const1063     auto concat_map(AN&&... an) const
1064         /// \cond SHOW_SERVICE_MEMBERS
1065         -> decltype(observable_member(concat_map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1066         /// \endcond
1067     {
1068         return      observable_member(concat_map_tag{},                *this, std::forward<AN>(an)...);
1069     }
1070 
1071     /*! @copydoc rx-concat_map.hpp
1072      */
1073     template<class... AN>
concat_transform(AN &&...an) const1074     auto concat_transform(AN&&... an) const
1075         /// \cond SHOW_SERVICE_MEMBERS
1076         -> decltype(observable_member(concat_map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1077         /// \endcond
1078     {
1079         return      observable_member(concat_map_tag{},                *this, std::forward<AN>(an)...);
1080     }
1081 
1082     /*! @copydoc rx-with_latest_from.hpp
1083      */
1084     template<class... AN>
with_latest_from(AN...an) const1085     auto with_latest_from(AN... an) const
1086         /// \cond SHOW_SERVICE_MEMBERS
1087         -> decltype(observable_member(with_latest_from_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1088         /// \endcond
1089     {
1090         return      observable_member(with_latest_from_tag{},                *this, std::forward<AN>(an)...);
1091     }
1092 
1093 
1094     /*! @copydoc rx-combine_latest.hpp
1095      */
1096     template<class... AN>
combine_latest(AN...an) const1097     auto combine_latest(AN... an) const
1098         /// \cond SHOW_SERVICE_MEMBERS
1099         -> decltype(observable_member(combine_latest_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1100         /// \endcond
1101     {
1102         return      observable_member(combine_latest_tag{},                *this, std::forward<AN>(an)...);
1103     }
1104 
1105     /*! @copydoc rx-zip.hpp
1106      */
1107     template<class... AN>
zip(AN &&...an) const1108     auto zip(AN&&... an) const
1109         /// \cond SHOW_SERVICE_MEMBERS
1110         -> decltype(observable_member(zip_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1111         /// \endcond
1112     {
1113         return      observable_member(zip_tag{},                *this, std::forward<AN>(an)...);
1114     }
1115 
1116     /*! @copydoc rx-group_by.hpp
1117      */
1118     template<class... AN>
group_by(AN &&...an) const1119     inline auto group_by(AN&&... an) const
1120         /// \cond SHOW_SERVICE_MEMBERS
1121         -> decltype(observable_member(group_by_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1122         /// \endcond
1123     {
1124         return      observable_member(group_by_tag{},                *this, std::forward<AN>(an)...);
1125     }
1126 
1127     /*! @copydoc rx-ignore_elements.hpp
1128      */
1129     template<class... AN>
ignore_elements(AN &&...an) const1130     auto ignore_elements(AN&&... an) const
1131     /// \cond SHOW_SERVICE_MEMBERS
1132     -> decltype(observable_member(ignore_elements_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1133     /// \endcond
1134     {
1135         return  observable_member(ignore_elements_tag{},                *this, std::forward<AN>(an)...);
1136     }
1137 
1138     /*! @copydoc rx-muticast.hpp
1139      */
1140     template<class... AN>
multicast(AN &&...an) const1141     auto multicast(AN&&... an) const
1142         /// \cond SHOW_SERVICE_MEMBERS
1143         -> decltype(observable_member(multicast_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1144         /// \endcond
1145     {
1146         return      observable_member(multicast_tag{},                *this, std::forward<AN>(an)...);
1147     }
1148 
1149     /*! @copydoc rx-publish.hpp
1150      */
1151     template<class... AN>
publish(AN &&...an) const1152     auto publish(AN&&... an) const
1153         /// \cond SHOW_SERVICE_MEMBERS
1154         -> decltype(observable_member(publish_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1155         /// \endcond
1156     {
1157         return      observable_member(publish_tag{},                *this, std::forward<AN>(an)...);
1158     }
1159 
1160     /*! @copydoc rxcpp::operators::publish_synchronized
1161      */
1162     template<class... AN>
publish_synchronized(AN &&...an) const1163     auto publish_synchronized(AN&&... an) const
1164         /// \cond SHOW_SERVICE_MEMBERS
1165         -> decltype(observable_member(publish_synchronized_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1166         /// \endcond
1167     {
1168         return      observable_member(publish_synchronized_tag{},                *this, std::forward<AN>(an)...);
1169     }
1170 
1171     /*! @copydoc rx-replay.hpp
1172      */
1173     template<class... AN>
replay(AN &&...an) const1174     auto replay(AN&&... an) const
1175         /// \cond SHOW_SERVICE_MEMBERS
1176         -> decltype(observable_member(replay_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1177         /// \endcond
1178     {
1179         return      observable_member(replay_tag{},                *this, std::forward<AN>(an)...);
1180     }
1181 
1182     /*! @copydoc rx-subscribe_on.hpp
1183      */
1184     template<class... AN>
subscribe_on(AN &&...an) const1185     auto subscribe_on(AN&&... an) const
1186         /// \cond SHOW_SERVICE_MEMBERS
1187         -> decltype(observable_member(subscribe_on_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1188         /// \endcond
1189     {
1190         return      observable_member(subscribe_on_tag{},                *this, std::forward<AN>(an)...);
1191     }
1192 
1193     /*! @copydoc rx-observe_on.hpp
1194     */
1195     template<class... AN>
observe_on(AN &&...an) const1196     auto observe_on(AN&&... an) const
1197         /// \cond SHOW_SERVICE_MEMBERS
1198         -> decltype(observable_member(observe_on_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1199         /// \endcond
1200     {
1201         return      observable_member(observe_on_tag{},                *this, std::forward<AN>(an)...);
1202     }
1203 
1204     /*! @copydoc rx-reduce.hpp
1205      */
1206     template<class... AN>
reduce(AN &&...an) const1207     auto reduce(AN&&... an) const
1208         /// \cond SHOW_SERVICE_MEMBERS
1209         -> decltype(observable_member(reduce_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1210         /// \endcond
1211     {
1212         return      observable_member(reduce_tag{},                *this, std::forward<AN>(an)...);
1213     }
1214 
1215     /*! @copydoc rx-reduce.hpp
1216      */
1217     template<class... AN>
accumulate(AN &&...an) const1218     auto accumulate(AN&&... an) const
1219         /// \cond SHOW_SERVICE_MEMBERS
1220         -> decltype(observable_member(reduce_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1221         /// \endcond
1222     {
1223         return      observable_member(reduce_tag{},                *this, std::forward<AN>(an)...);
1224     }
1225 
1226     /*! @copydoc rxcpp::operators::first
1227      */
1228     template<class... AN>
first(AN ** ...) const1229     auto first(AN**...) const
1230         /// \cond SHOW_SERVICE_MEMBERS
1231         -> decltype(observable_member(delayed_type<first_tag, AN...>::value(), *(this_type*)nullptr))
1232         /// \endcond
1233     {
1234         return      observable_member(delayed_type<first_tag, AN...>::value(),                *this);
1235         static_assert(sizeof...(AN) == 0, "first() was passed too many arguments.");
1236     }
1237 
1238     /*! @copydoc rxcpp::operators::last
1239      */
1240     template<class... AN>
last(AN ** ...) const1241     auto last(AN**...) const
1242         /// \cond SHOW_SERVICE_MEMBERS
1243         -> decltype(observable_member(delayed_type<last_tag, AN...>::value(), *(this_type*)nullptr))
1244         /// \endcond
1245     {
1246         return      observable_member(delayed_type<last_tag, AN...>::value(),                *this);
1247         static_assert(sizeof...(AN) == 0, "last() was passed too many arguments.");
1248     }
1249 
1250     /*! @copydoc rxcpp::operators::count
1251      */
1252     template<class... AN>
count(AN ** ...) const1253     auto count(AN**...) const
1254         /// \cond SHOW_SERVICE_MEMBERS
1255         -> decltype(observable_member(delayed_type<reduce_tag, AN...>::value(), *(this_type*)nullptr, 0, rxu::count(), identity_for<int>()))
1256         /// \endcond
1257     {
1258         return      observable_member(delayed_type<reduce_tag, AN...>::value(),                *this, 0, rxu::count(), identity_for<int>());
1259         static_assert(sizeof...(AN) == 0, "count() was passed too many arguments.");
1260     }
1261 
1262     /*! @copydoc rxcpp::operators::sum
1263      */
1264     template<class... AN>
sum(AN ** ...) const1265     auto sum(AN**...) const
1266         /// \cond SHOW_SERVICE_MEMBERS
1267         -> decltype(observable_member(delayed_type<sum_tag, AN...>::value(), *(this_type*)nullptr))
1268         /// \endcond
1269     {
1270         return      observable_member(delayed_type<sum_tag, AN...>::value(),                *this);
1271         static_assert(sizeof...(AN) == 0, "sum() was passed too many arguments.");
1272     }
1273 
1274     /*! @copydoc rxcpp::operators::average
1275      */
1276     template<class... AN>
average(AN ** ...) const1277     auto average(AN**...) const
1278         /// \cond SHOW_SERVICE_MEMBERS
1279         -> decltype(observable_member(delayed_type<average_tag, AN...>::value(), *(this_type*)nullptr))
1280         /// \endcond
1281     {
1282         return      observable_member(delayed_type<average_tag, AN...>::value(),                *this);
1283         static_assert(sizeof...(AN) == 0, "average() was passed too many arguments.");
1284     }
1285 
1286     /*! @copydoc rxcpp::operators::max
1287      */
1288     template<class... AN>
max(AN ** ...) const1289     auto max(AN**...) const
1290         /// \cond SHOW_SERVICE_MEMBERS
1291         -> decltype(observable_member(delayed_type<max_tag, AN...>::value(), *(this_type*)nullptr))
1292         /// \endcond
1293     {
1294         return      observable_member(delayed_type<max_tag, AN...>::value(),                *this);
1295         static_assert(sizeof...(AN) == 0, "max() was passed too many arguments.");
1296     }
1297 
1298     /*! @copydoc rxcpp::operators::min
1299      */
1300     template<class... AN>
min(AN ** ...) const1301     auto min(AN**...) const
1302         /// \cond SHOW_SERVICE_MEMBERS
1303         -> decltype(observable_member(delayed_type<min_tag, AN...>::value(), *(this_type*)nullptr))
1304         /// \endcond
1305     {
1306         return      observable_member(delayed_type<min_tag, AN...>::value(),                *this);
1307         static_assert(sizeof...(AN) == 0, "min() was passed too many arguments.");
1308     }
1309 
1310     /*! @copydoc rx-scan.hpp
1311     */
1312     template<class... AN>
scan(AN...an) const1313     auto scan(AN... an) const
1314         /// \cond SHOW_SERVICE_MEMBERS
1315         -> decltype(observable_member(scan_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1316         /// \endcond
1317     {
1318         return      observable_member(scan_tag{},                *this, std::forward<AN>(an)...);
1319     }
1320 
1321     /*! @copydoc rx-sample_time.hpp
1322      */
1323     template<class... AN>
sample_with_time(AN &&...an) const1324     auto sample_with_time(AN&&... an) const
1325         /// \cond SHOW_SERVICE_MEMBERS
1326         -> decltype(observable_member(sample_with_time_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1327         /// \endcond
1328     {
1329         return      observable_member(sample_with_time_tag{},                *this, std::forward<AN>(an)...);
1330     }
1331 
1332     /*! @copydoc rx-skip.hpp
1333     */
1334     template<class... AN>
skip(AN...an) const1335     auto skip(AN... an) const
1336         /// \cond SHOW_SERVICE_MEMBERS
1337         -> decltype(observable_member(skip_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1338         /// \endcond
1339     {
1340         return      observable_member(skip_tag{},                *this, std::forward<AN>(an)...);
1341     }
1342 
1343     /*! @copydoc rx-skip.hpp
1344      */
1345     template<class... AN>
skip_while(AN...an) const1346     auto skip_while(AN... an) const
1347         /// \cond SHOW_SERVICE_MEMBERS
1348         -> decltype(observable_member(skip_while_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1349         /// \endcond
1350     {
1351         return      observable_member(skip_while_tag{},                *this, std::forward<AN>(an)...);
1352     }
1353 
1354     /*! @copydoc rx-skip_last.hpp
1355     */
1356     template<class... AN>
skip_last(AN...an) const1357     auto skip_last(AN... an) const
1358         /// \cond SHOW_SERVICE_MEMBERS
1359         -> decltype(observable_member(skip_last_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1360         /// \endcond
1361     {
1362         return      observable_member(skip_last_tag{},                *this, std::forward<AN>(an)...);
1363     }
1364 
1365     /*! @copydoc rx-skip_until.hpp
1366     */
1367     template<class... AN>
skip_until(AN...an) const1368     auto skip_until(AN... an) const
1369         /// \cond SHOW_SERVICE_MEMBERS
1370         -> decltype(observable_member(skip_until_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1371         /// \endcond
1372     {
1373         return      observable_member(skip_until_tag{},                *this, std::forward<AN>(an)...);
1374     }
1375 
1376     /*! @copydoc rx-take.hpp
1377      */
1378     template<class... AN>
take(AN...an) const1379     auto take(AN... an) const
1380         /// \cond SHOW_SERVICE_MEMBERS
1381         -> decltype(observable_member(take_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1382         /// \endcond
1383     {
1384         return      observable_member(take_tag{},                *this, std::forward<AN>(an)...);
1385     }
1386 
1387     /*! @copydoc rx-take_last.hpp
1388     */
1389     template<class... AN>
take_last(AN &&...an) const1390     auto take_last(AN&&... an) const
1391         /// \cond SHOW_SERVICE_MEMBERS
1392         -> decltype(observable_member(take_last_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1393         /// \endcond
1394     {
1395         return      observable_member(take_last_tag{},                *this, std::forward<AN>(an)...);
1396     }
1397 
1398     /*! @copydoc rx-take_until.hpp
1399     */
1400     template<class... AN>
take_until(AN &&...an) const1401     auto take_until(AN&&... an) const
1402         /// \cond SHOW_SERVICE_MEMBERS
1403         -> decltype(observable_member(take_until_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1404         /// \endcond
1405     {
1406         return      observable_member(take_until_tag{},                *this, std::forward<AN>(an)...);
1407     }
1408 
1409     /*! @copydoc rx-take_while.hpp
1410     */
1411     template<class... AN>
take_while(AN &&...an) const1412     auto take_while(AN&&... an) const
1413         /// \cond SHOW_SERVICE_MEMBERS
1414         -> decltype(observable_member(take_while_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1415         /// \endcond
1416     {
1417         return      observable_member(take_while_tag{},                *this, std::forward<AN>(an)...);
1418     }
1419 
1420     /*! @copydoc rx-repeat.hpp
1421      */
1422     template<class... AN>
repeat(AN...an) const1423     auto repeat(AN... an) const
1424         /// \cond SHOW_SERVICE_MEMBERS
1425         -> decltype(observable_member(repeat_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1426         /// \endcond
1427     {
1428         return      observable_member(repeat_tag{},                *this, std::forward<AN>(an)...);
1429     }
1430 
1431     /*! @copydoc rx-retry.hpp
1432      */
1433     template<class... AN>
retry(AN...an) const1434     auto retry(AN... an) const
1435         /// \cond SHOW_SERVICE_MEMBERS
1436         -> decltype(observable_member(retry_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1437         /// \endcond
1438     {
1439         return      observable_member(retry_tag{},                *(this_type*)this, std::forward<AN>(an)...);
1440     }
1441 
1442     /*! @copydoc rx-start_with.hpp
1443      */
1444     template<class... AN>
start_with(AN...an) const1445     auto start_with(AN... an) const
1446         /// \cond SHOW_SERVICE_MEMBERS
1447         -> decltype(observable_member(start_with_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1448         /// \endcond
1449     {
1450         return      observable_member(start_with_tag{},                *this, std::forward<AN>(an)...);
1451     }
1452 
1453     /*! @copydoc rx-pairwise.hpp
1454      */
1455     template<class... AN>
pairwise(AN...an) const1456     auto pairwise(AN... an) const
1457         /// \cond SHOW_SERVICE_MEMBERS
1458         -> decltype(observable_member(pairwise_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1459         /// \endcond
1460     {
1461         return      observable_member(pairwise_tag{},                *this, std::forward<AN>(an)...);
1462     }
1463 };
1464 
1465 template<class T, class SourceOperator>
operator ==(const observable<T,SourceOperator> & lhs,const observable<T,SourceOperator> & rhs)1466 inline bool operator==(const observable<T, SourceOperator>& lhs, const observable<T, SourceOperator>& rhs) {
1467     return lhs.source_operator == rhs.source_operator;
1468 }
1469 template<class T, class SourceOperator>
operator !=(const observable<T,SourceOperator> & lhs,const observable<T,SourceOperator> & rhs)1470 inline bool operator!=(const observable<T, SourceOperator>& lhs, const observable<T, SourceOperator>& rhs) {
1471     return !(lhs == rhs);
1472 }
1473 
1474 /*!
1475     \defgroup group-core Basics
1476 
1477     \brief These are the core classes that combine to represent a set of values emitted over time that can be cancelled.
1478 
1479     \class rxcpp::observable<void, void>
1480 
1481     \brief typed as ```rxcpp::observable<>```, this is a collection of factory methods that return an observable.
1482 
1483     \ingroup group-core
1484 
1485     \par Create a new type of observable
1486 
1487     \sample
1488     \snippet create.cpp Create sample
1489     \snippet output.txt Create sample
1490 
1491     \par Create an observable that emits a range of values
1492 
1493     \sample
1494     \snippet range.cpp range sample
1495     \snippet output.txt range sample
1496 
1497     \par Create an observable that emits nothing / generates an error / immediately completes
1498 
1499     \sample
1500     \snippet never.cpp never sample
1501     \snippet output.txt never sample
1502     \snippet error.cpp error sample
1503     \snippet output.txt error sample
1504     \snippet empty.cpp empty sample
1505     \snippet output.txt empty sample
1506 
1507     \par Create an observable that generates new observable for each subscriber
1508 
1509     \sample
1510     \snippet defer.cpp defer sample
1511     \snippet output.txt defer sample
1512 
1513     \par Create an observable that emits items every specified interval of time
1514 
1515     \sample
1516     \snippet interval.cpp interval sample
1517     \snippet output.txt interval sample
1518 
1519     \par Create an observable that emits items in the specified interval of time
1520 
1521     \sample
1522     \snippet timer.cpp duration timer sample
1523     \snippet output.txt duration timer sample
1524 
1525     \par Create an observable that emits all items from a collection
1526 
1527     \sample
1528     \snippet iterate.cpp iterate sample
1529     \snippet output.txt iterate sample
1530 
1531     \par Create an observable that emits a set of specified items
1532 
1533     \sample
1534     \snippet from.cpp from sample
1535     \snippet output.txt from sample
1536 
1537     \par Create an observable that emits a single item
1538 
1539     \sample
1540     \snippet just.cpp just sample
1541     \snippet output.txt just sample
1542 
1543     \par Create an observable that emits a set of items and then subscribes to another observable
1544 
1545     \sample
1546     \snippet start_with.cpp full start_with sample
1547     \snippet output.txt full start_with sample
1548 
1549     \par Create an observable that generates a new observable based on a generated resource for each subscriber
1550 
1551     \sample
1552     \snippet scope.cpp scope sample
1553     \snippet output.txt scope sample
1554 
1555 */
1556 template<>
1557 class observable<void, void>
1558 {
1559     ~observable();
1560 public:
1561     /*! @copydoc rx-create.hpp
1562      */
1563     template<class T, class OnSubscribe>
create(OnSubscribe os)1564     static auto create(OnSubscribe os)
1565         -> decltype(rxs::create<T>(std::move(os))) {
1566         return      rxs::create<T>(std::move(os));
1567     }
1568 
1569     /*! @copydoc rx-range.hpp
1570      */
1571     template<class T>
range(T first=0,T last=std::numeric_limits<T>::max (),std::ptrdiff_t step=1)1572     static auto range(T first = 0, T last = std::numeric_limits<T>::max(), std::ptrdiff_t step = 1)
1573         -> decltype(rxs::range<T>(first, last, step, identity_current_thread())) {
1574         return      rxs::range<T>(first, last, step, identity_current_thread());
1575     }
1576     /*! @copydoc rx-range.hpp
1577      */
1578     template<class T, class Coordination>
range(T first,T last,std::ptrdiff_t step,Coordination cn)1579     static auto range(T first, T last, std::ptrdiff_t step, Coordination cn)
1580         -> decltype(rxs::range<T>(first, last, step, std::move(cn))) {
1581         return      rxs::range<T>(first, last, step, std::move(cn));
1582     }
1583     /*! @copydoc rx-range.hpp
1584      */
1585     template<class T, class Coordination>
range(T first,T last,Coordination cn)1586     static auto range(T first, T last, Coordination cn)
1587         -> decltype(rxs::range<T>(first, last, std::move(cn))) {
1588         return      rxs::range<T>(first, last, std::move(cn));
1589     }
1590     /*! @copydoc rx-range.hpp
1591      */
1592     template<class T, class Coordination>
range(T first,Coordination cn)1593     static auto range(T first, Coordination cn)
1594         -> decltype(rxs::range<T>(first, std::move(cn))) {
1595         return      rxs::range<T>(first, std::move(cn));
1596     }
1597 
1598     /*! @copydoc rx-never.hpp
1599      */
1600     template<class T>
never()1601     static auto never()
1602         -> decltype(rxs::never<T>()) {
1603         return      rxs::never<T>();
1604     }
1605 
1606     /*! @copydoc rx-defer.hpp
1607      */
1608     template<class ObservableFactory>
defer(ObservableFactory of)1609     static auto defer(ObservableFactory of)
1610         -> decltype(rxs::defer(std::move(of))) {
1611         return      rxs::defer(std::move(of));
1612     }
1613 
1614     /*! @copydoc rx-interval.hpp
1615      */
1616     template<class... AN>
interval(rxsc::scheduler::clock_type::duration period,AN ** ...)1617     static auto interval(rxsc::scheduler::clock_type::duration period, AN**...)
1618         -> decltype(rxs::interval(period)) {
1619         return      rxs::interval(period);
1620         static_assert(sizeof...(AN) == 0, "interval(period) was passed too many arguments.");
1621     }
1622     /*! @copydoc rx-interval.hpp
1623      */
1624     template<class Coordination>
interval(rxsc::scheduler::clock_type::duration period,Coordination cn)1625     static auto interval(rxsc::scheduler::clock_type::duration period, Coordination cn)
1626         -> decltype(rxs::interval(period, std::move(cn))) {
1627         return      rxs::interval(period, std::move(cn));
1628     }
1629     /*! @copydoc rx-interval.hpp
1630      */
1631     template<class... AN>
interval(rxsc::scheduler::clock_type::time_point initial,rxsc::scheduler::clock_type::duration period,AN ** ...)1632     static auto interval(rxsc::scheduler::clock_type::time_point initial, rxsc::scheduler::clock_type::duration period, AN**...)
1633         -> decltype(rxs::interval(initial, period)) {
1634         return      rxs::interval(initial, period);
1635         static_assert(sizeof...(AN) == 0, "interval(initial, period) was passed too many arguments.");
1636     }
1637     /*! @copydoc rx-interval.hpp
1638      */
1639     template<class Coordination>
interval(rxsc::scheduler::clock_type::time_point initial,rxsc::scheduler::clock_type::duration period,Coordination cn)1640     static auto interval(rxsc::scheduler::clock_type::time_point initial, rxsc::scheduler::clock_type::duration period, Coordination cn)
1641         -> decltype(rxs::interval(initial, period, std::move(cn))) {
1642         return      rxs::interval(initial, period, std::move(cn));
1643     }
1644 
1645     /*! @copydoc rx-timer.hpp
1646      */
1647     template<class... AN>
timer(rxsc::scheduler::clock_type::time_point at,AN ** ...)1648     static auto timer(rxsc::scheduler::clock_type::time_point at, AN**...)
1649         -> decltype(rxs::timer(at)) {
1650         return      rxs::timer(at);
1651         static_assert(sizeof...(AN) == 0, "timer(at) was passed too many arguments.");
1652     }
1653     /*! @copydoc rx-timer.hpp
1654      */
1655     template<class... AN>
timer(rxsc::scheduler::clock_type::duration after,AN ** ...)1656     static auto timer(rxsc::scheduler::clock_type::duration after, AN**...)
1657         -> decltype(rxs::timer(after)) {
1658         return      rxs::timer(after);
1659         static_assert(sizeof...(AN) == 0, "timer(after) was passed too many arguments.");
1660     }
1661     /*! @copydoc rx-timer.hpp
1662      */
1663     template<class Coordination>
timer(rxsc::scheduler::clock_type::time_point when,Coordination cn)1664     static auto timer(rxsc::scheduler::clock_type::time_point when, Coordination cn)
1665         -> decltype(rxs::timer(when, std::move(cn))) {
1666         return      rxs::timer(when, std::move(cn));
1667     }
1668     /*! @copydoc rx-timer.hpp
1669      */
1670     template<class Coordination>
timer(rxsc::scheduler::clock_type::duration when,Coordination cn)1671     static auto timer(rxsc::scheduler::clock_type::duration when, Coordination cn)
1672         -> decltype(rxs::timer(when, std::move(cn))) {
1673         return      rxs::timer(when, std::move(cn));
1674     }
1675 
1676     /*! @copydoc rx-iterate.hpp
1677      */
1678     template<class Collection>
iterate(Collection c)1679     static auto iterate(Collection c)
1680         -> decltype(rxs::iterate(std::move(c), identity_current_thread())) {
1681         return      rxs::iterate(std::move(c), identity_current_thread());
1682     }
1683     /*! @copydoc rx-iterate.hpp
1684      */
1685     template<class Collection, class Coordination>
iterate(Collection c,Coordination cn)1686     static auto iterate(Collection c, Coordination cn)
1687         -> decltype(rxs::iterate(std::move(c), std::move(cn))) {
1688         return      rxs::iterate(std::move(c), std::move(cn));
1689     }
1690 
1691     /*! @copydoc rxcpp::sources::from()
1692      */
1693     template<class T>
from()1694     static auto from()
1695         -> decltype(    rxs::from<T>()) {
1696         return          rxs::from<T>();
1697     }
1698     /*! @copydoc rxcpp::sources::from(Coordination cn)
1699      */
1700     template<class T, class Coordination>
from(Coordination cn)1701     static auto from(Coordination cn)
1702         -> typename std::enable_if<is_coordination<Coordination>::value,
1703             decltype(   rxs::from<T>(std::move(cn)))>::type {
1704         return          rxs::from<T>(std::move(cn));
1705     }
1706     /*! @copydoc rxcpp::sources::from(Value0 v0, ValueN... vn)
1707      */
1708     template<class Value0, class... ValueN>
from(Value0 v0,ValueN...vn)1709     static auto from(Value0 v0, ValueN... vn)
1710         -> typename std::enable_if<!is_coordination<Value0>::value,
1711             decltype(   rxs::from(v0, vn...))>::type {
1712         return          rxs::from(v0, vn...);
1713     }
1714     /*! @copydoc rxcpp::sources::from(Coordination cn, Value0 v0, ValueN... vn)
1715      */
1716     template<class Coordination, class Value0, class... ValueN>
from(Coordination cn,Value0 v0,ValueN...vn)1717     static auto from(Coordination cn, Value0 v0, ValueN... vn)
1718         -> typename std::enable_if<is_coordination<Coordination>::value,
1719             decltype(   rxs::from(std::move(cn), v0, vn...))>::type {
1720         return          rxs::from(std::move(cn), v0, vn...);
1721     }
1722 
1723     /*! @copydoc rxcpp::sources::just(Value0 v0)
1724      */
1725     template<class T>
just(T v)1726     static auto just(T v)
1727         -> decltype(rxs::just(std::move(v))) {
1728         return      rxs::just(std::move(v));
1729     }
1730     /*! @copydoc rxcpp::sources::just(Value0 v0, Coordination cn)
1731      */
1732     template<class T, class Coordination>
just(T v,Coordination cn)1733     static auto just(T v, Coordination cn)
1734         -> decltype(rxs::just(std::move(v), std::move(cn))) {
1735         return      rxs::just(std::move(v), std::move(cn));
1736     }
1737 
1738     /*! @copydoc rxcpp::sources::start_with(Observable o, Value0 v0, ValueN... vn)
1739      */
1740     template<class Observable, class Value0, class... ValueN>
start_with(Observable o,Value0 v0,ValueN...vn)1741     static auto start_with(Observable o, Value0 v0, ValueN... vn)
1742         -> decltype(rxs::start_with(std::move(o), std::move(v0), std::move(vn)...)) {
1743         return      rxs::start_with(std::move(o), std::move(v0), std::move(vn)...);
1744     }
1745 
1746     /*! @copydoc rx-empty.hpp
1747      */
1748     template<class T>
empty()1749     static auto empty()
1750         -> decltype(from<T>()) {
1751         return      from<T>();
1752     }
1753     /*! @copydoc rx-empty.hpp
1754      */
1755     template<class T, class Coordination>
empty(Coordination cn)1756     static auto empty(Coordination cn)
1757         -> decltype(from<T>(std::move(cn))) {
1758         return      from<T>(std::move(cn));
1759     }
1760 
1761     /*! @copydoc rx-error.hpp
1762      */
1763     template<class T, class Exception>
error(Exception && e)1764     static auto error(Exception&& e)
1765         -> decltype(rxs::error<T>(std::forward<Exception>(e))) {
1766         return      rxs::error<T>(std::forward<Exception>(e));
1767     }
1768     /*! @copydoc rx-error.hpp
1769      */
1770     template<class T, class Exception, class Coordination>
error(Exception && e,Coordination cn)1771     static auto error(Exception&& e, Coordination cn)
1772         -> decltype(rxs::error<T>(std::forward<Exception>(e), std::move(cn))) {
1773         return      rxs::error<T>(std::forward<Exception>(e), std::move(cn));
1774     }
1775 
1776     /*! @copydoc rx-scope.hpp
1777      */
1778     template<class ResourceFactory, class ObservableFactory>
scope(ResourceFactory rf,ObservableFactory of)1779     static auto scope(ResourceFactory rf, ObservableFactory of)
1780         -> decltype(rxs::scope(std::move(rf), std::move(of))) {
1781         return      rxs::scope(std::move(rf), std::move(of));
1782     }
1783 };
1784 
1785 }
1786 
1787 //
1788 // support range() >> filter() >> subscribe() syntax
1789 // '>>' is spelled 'stream'
1790 //
1791 template<class T, class SourceOperator, class OperatorFactory>
operator >>(const rxcpp::observable<T,SourceOperator> & source,OperatorFactory && of)1792 auto operator >> (const rxcpp::observable<T, SourceOperator>& source, OperatorFactory&& of)
1793     -> decltype(source.op(std::forward<OperatorFactory>(of))) {
1794     return      source.op(std::forward<OperatorFactory>(of));
1795 }
1796 
1797 //
1798 // support range() | filter() | subscribe() syntax
1799 // '|' is spelled 'pipe'
1800 //
1801 template<class T, class SourceOperator, class OperatorFactory>
operator |(const rxcpp::observable<T,SourceOperator> & source,OperatorFactory && of)1802 auto operator | (const rxcpp::observable<T, SourceOperator>& source, OperatorFactory&& of)
1803     -> decltype(source.op(std::forward<OperatorFactory>(of))) {
1804     return      source.op(std::forward<OperatorFactory>(of));
1805 }
1806 
1807 #endif
1808