1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_RX_OBSERVABLE_HPP)
6 #define RXCPP_RX_OBSERVABLE_HPP
7 
8 #include "rx-includes.hpp"
9 
10 #ifdef __GNUG__
11 #define EXPLICIT_THIS this->
12 #else
13 #define EXPLICIT_THIS
14 #endif
15 
16 namespace rxcpp {
17 
18 namespace detail {
19 
20 template<class Subscriber, class T>
21 struct has_on_subscribe_for
22 {
23     struct not_void {};
24     template<class CS, class CT>
25     static auto check(int) -> decltype((*(CT*)nullptr).on_subscribe(*(CS*)nullptr));
26     template<class CS, class CT>
27     static not_void check(...);
28 
29     typedef decltype(check<rxu::decay_t<Subscriber>, T>(0)) detail_result;
30     static const bool value = std::is_same<detail_result, void>::value;
31 };
32 
33 }
34 
35 template<class T>
36 class dynamic_observable
37     : public rxs::source_base<T>
38 {
39     struct state_type
40         : public std::enable_shared_from_this<state_type>
41     {
42         typedef std::function<void(subscriber<T>)> onsubscribe_type;
43 
44         onsubscribe_type on_subscribe;
45     };
46     std::shared_ptr<state_type> state;
47 
48     template<class U>
49     friend bool operator==(const dynamic_observable<U>&, const dynamic_observable<U>&);
50 
51     template<class SO>
construct(SO && source,rxs::tag_source &&)52     void construct(SO&& source, rxs::tag_source&&) {
53         rxu::decay_t<SO> so = std::forward<SO>(source);
54         state->on_subscribe = [so](subscriber<T> o) mutable {
55             so.on_subscribe(std::move(o));
56         };
57     }
58 
59     struct tag_function {};
60     template<class F>
construct(F && f,tag_function &&)61     void construct(F&& f, tag_function&&) {
62         state->on_subscribe = std::forward<F>(f);
63     }
64 
65 public:
66 
67     typedef tag_dynamic_observable dynamic_observable_tag;
68 
dynamic_observable()69     dynamic_observable()
70     {
71     }
72 
73     template<class SOF>
dynamic_observable(SOF && sof,typename std::enable_if<!is_dynamic_observable<SOF>::value,void ** >::type=0)74     explicit dynamic_observable(SOF&& sof, typename std::enable_if<!is_dynamic_observable<SOF>::value, void**>::type = 0)
75         : state(std::make_shared<state_type>())
76     {
77         construct(std::forward<SOF>(sof),
78                   typename std::conditional<rxs::is_source<SOF>::value || rxo::is_operator<SOF>::value, rxs::tag_source, tag_function>::type());
79     }
80 
on_subscribe(subscriber<T> o) const81     void on_subscribe(subscriber<T> o) const {
82         state->on_subscribe(std::move(o));
83     }
84 
85     template<class Subscriber>
86     typename std::enable_if<is_subscriber<Subscriber>::value, void>::type
on_subscribe(Subscriber o) const87     on_subscribe(Subscriber o) const {
88         state->on_subscribe(o.as_dynamic());
89     }
90 };
91 
92 template<class T>
operator ==(const dynamic_observable<T> & lhs,const dynamic_observable<T> & rhs)93 inline bool operator==(const dynamic_observable<T>& lhs, const dynamic_observable<T>& rhs) {
94     return lhs.state == rhs.state;
95 }
96 template<class T>
operator !=(const dynamic_observable<T> & lhs,const dynamic_observable<T> & rhs)97 inline bool operator!=(const dynamic_observable<T>& lhs, const dynamic_observable<T>& rhs) {
98     return !(lhs == rhs);
99 }
100 
101 template<class T, class Source>
make_observable_dynamic(Source && s)102 observable<T> make_observable_dynamic(Source&& s) {
103     return observable<T>(dynamic_observable<T>(std::forward<Source>(s)));
104 }
105 
106 namespace detail {
107 template<bool Selector, class Default, class SO>
108 struct resolve_observable;
109 
110 template<class Default, class SO>
111 struct resolve_observable<true, Default, SO>
112 {
113     typedef typename SO::type type;
114     typedef typename type::value_type value_type;
115     static const bool value = true;
116     typedef observable<value_type, type> observable_type;
117     template<class... AN>
makerxcpp::detail::resolve_observable118     static observable_type make(const Default&, AN&&... an) {
119         return observable_type(type(std::forward<AN>(an)...));
120     }
121 };
122 template<class Default, class SO>
123 struct resolve_observable<false, Default, SO>
124 {
125     static const bool value = false;
126     typedef Default observable_type;
127     template<class... AN>
makerxcpp::detail::resolve_observable128     static observable_type make(const observable_type& that, const AN&...) {
129         return that;
130     }
131 };
132 template<class SO>
133 struct resolve_observable<true, void, SO>
134 {
135     typedef typename SO::type type;
136     typedef typename type::value_type value_type;
137     static const bool value = true;
138     typedef observable<value_type, type> observable_type;
139     template<class... AN>
makerxcpp::detail::resolve_observable140     static observable_type make(AN&&... an) {
141         return observable_type(type(std::forward<AN>(an)...));
142     }
143 };
144 template<class SO>
145 struct resolve_observable<false, void, SO>
146 {
147     static const bool value = false;
148     typedef void observable_type;
149     template<class... AN>
makerxcpp::detail::resolve_observable150     static observable_type make(const AN&...) {
151     }
152 };
153 
154 }
155 
156 template<class Selector, class Default, template<class... TN> class SO, class... AN>
157 struct defer_observable
158     : public detail::resolve_observable<Selector::value, Default, rxu::defer_type<SO, AN...>>
159 {
160 };
161 
162 /*!
163     \brief a source of values whose methods block until all values have been emitted. subscribe or use one of the operator methods that reduce the values emitted to a single value.
164 
165     \ingroup group-observable
166 
167 */
168 template<class T, class Observable>
169 class blocking_observable
170 {
171     template<class Obsvbl, class... ArgN>
blocking_subscribe(const Obsvbl & source,bool do_rethrow,ArgN &&...an)172     static auto blocking_subscribe(const Obsvbl& source, bool do_rethrow, ArgN&&... an)
173         -> void {
174         std::mutex lock;
175         std::condition_variable wake;
176         bool disposed = false;
177 
178         auto dest = make_subscriber<T>(std::forward<ArgN>(an)...);
179 
180         rxu::error_ptr error;
181         bool has_error = false;
182 
183         // keep any error to rethrow at the end.
184         // copy 'dest' by-value to avoid using it after it goes out of scope.
185         auto scbr = make_subscriber<T>(
186             dest,
187             [dest](T t){dest.on_next(t);},
188             [dest,&error,&has_error,do_rethrow](rxu::error_ptr e){
189                 if (do_rethrow) {
190                     has_error = true;
191                     error = e;
192                 } else {
193                     dest.on_error(e);
194                 }
195             },
196             [dest](){dest.on_completed();}
197             );
198 
199         auto cs = scbr.get_subscription();
200         cs.add(
201             [&](){
202                 std::unique_lock<std::mutex> guard(lock);
203                 wake.notify_one();
204                 disposed = true;
205             });
206 
207         source.subscribe(std::move(scbr));
208 
209         std::unique_lock<std::mutex> guard(lock);
210         wake.wait(guard,
211             [&](){
212                 return disposed;
213             });
214 
215         if (has_error) {rxu::rethrow_exception(error);}
216     }
217 
218 public:
219     typedef rxu::decay_t<Observable> observable_type;
220     observable_type source;
~blocking_observable()221     ~blocking_observable()
222     {
223     }
blocking_observable(observable_type s)224     blocking_observable(observable_type s) : source(std::move(s)) {}
225 
226     ///
227     /// `subscribe` will cause this observable to emit values to the provided subscriber.
228     ///
229     /// \return void
230     ///
231     /// \param an... - the arguments are passed to make_subscriber().
232     ///
233     /// callers must provide enough arguments to make a subscriber.
234     /// overrides are supported. thus
235     ///   `subscribe(thesubscriber, composite_subscription())`
236     /// will take `thesubscriber.get_observer()` and the provided
237     /// subscription and subscribe to the new subscriber.
238     /// the `on_next`, `on_error`, `on_completed` methods can be supplied instead of an observer
239     /// if a subscription or subscriber is not provided then a new subscription will be created.
240     ///
241     template<class... ArgN>
subscribe(ArgN &&...an) const242     auto subscribe(ArgN&&... an) const
243         -> void {
244         return blocking_subscribe(source, false, std::forward<ArgN>(an)...);
245     }
246 
247     ///
248     /// `subscribe_with_rethrow` will cause this observable to emit values to the provided subscriber.
249     ///
250     /// \note  If the source observable calls on_error, the raised exception is rethrown by this method.
251     ///
252     /// \note  If the source observable calls on_error, the `on_error` method on the subscriber will not be called.
253     ///
254     /// \return void
255     ///
256     /// \param an... - the arguments are passed to make_subscriber().
257     ///
258     /// callers must provide enough arguments to make a subscriber.
259     /// overrides are supported. thus
260     ///   `subscribe(thesubscriber, composite_subscription())`
261     /// will take `thesubscriber.get_observer()` and the provided
262     /// subscription and subscribe to the new subscriber.
263     /// the `on_next`, `on_error`, `on_completed` methods can be supplied instead of an observer
264     /// if a subscription or subscriber is not provided then a new subscription will be created.
265     ///
266     template<class... ArgN>
subscribe_with_rethrow(ArgN &&...an) const267     auto subscribe_with_rethrow(ArgN&&... an) const
268         -> void {
269         return blocking_subscribe(source, true, std::forward<ArgN>(an)...);
270     }
271 
272     /*! Return the first item emitted by this blocking_observable, or throw an std::runtime_error exception if it emits no items.
273 
274         \return  The first item emitted by this blocking_observable.
275 
276         \note  If the source observable calls on_error, the raised exception is rethrown by this method.
277 
278         \sample
279         When the source observable emits at least one item:
280         \snippet blocking_observable.cpp blocking first sample
281         \snippet output.txt blocking first sample
282 
283         When the source observable is empty:
284         \snippet blocking_observable.cpp blocking first empty sample
285         \snippet output.txt blocking first empty sample
286     */
287     template<class... AN>
first(AN ** ...)288     auto first(AN**...) -> delayed_type_t<T, AN...> const {
289         rxu::maybe<T> result;
290         composite_subscription cs;
291         subscribe_with_rethrow(
292             cs,
293             [&](T v){result.reset(v); cs.unsubscribe();});
294         if (result.empty())
295             rxu::throw_exception(rxcpp::empty_error("first() requires a stream with at least one value"));
296         return result.get();
297         static_assert(sizeof...(AN) == 0, "first() was passed too many arguments.");
298     }
299 
300     /*! Return the last item emitted by this blocking_observable, or throw an std::runtime_error exception if it emits no items.
301 
302         \return  The last item emitted by this blocking_observable.
303 
304         \note  If the source observable calls on_error, the raised exception is rethrown by this method.
305 
306         \sample
307         When the source observable emits at least one item:
308         \snippet blocking_observable.cpp blocking last sample
309         \snippet output.txt blocking last sample
310 
311         When the source observable is empty:
312         \snippet blocking_observable.cpp blocking last empty sample
313         \snippet output.txt blocking last empty sample
314     */
315     template<class... AN>
last(AN ** ...)316     auto last(AN**...) -> delayed_type_t<T, AN...> const {
317         rxu::maybe<T> result;
318         subscribe_with_rethrow(
319             [&](T v){result.reset(v);});
320         if (result.empty())
321             rxu::throw_exception(rxcpp::empty_error("last() requires a stream with at least one value"));
322         return result.get();
323         static_assert(sizeof...(AN) == 0, "last() was passed too many arguments.");
324     }
325 
326     /*! Return the total number of items emitted by this blocking_observable.
327 
328         \return  The total number of items emitted by this blocking_observable.
329 
330         \sample
331         \snippet blocking_observable.cpp blocking count sample
332         \snippet output.txt blocking count sample
333 
334         When the source observable calls on_error:
335         \snippet blocking_observable.cpp blocking count error sample
336         \snippet output.txt blocking count error sample
337     */
count() const338     int count() const {
339         int result = 0;
340         source.count().as_blocking().subscribe_with_rethrow(
341             [&](int v){result = v;});
342         return result;
343     }
344 
345     /*! Return the sum of all items emitted by this blocking_observable, or throw an std::runtime_error exception if it emits no items.
346 
347         \return  The sum of all items emitted by this blocking_observable.
348 
349         \sample
350         When the source observable emits at least one item:
351         \snippet blocking_observable.cpp blocking sum sample
352         \snippet output.txt blocking sum sample
353 
354         When the source observable is empty:
355         \snippet blocking_observable.cpp blocking sum empty sample
356         \snippet output.txt blocking sum empty sample
357 
358         When the source observable calls on_error:
359         \snippet blocking_observable.cpp blocking sum error sample
360         \snippet output.txt blocking sum error sample
361     */
sum() const362     T sum() const {
363         return source.sum().as_blocking().last();
364     }
365 
366     /*! Return the average value of all items emitted by this blocking_observable, or throw an std::runtime_error exception if it emits no items.
367 
368         \return  The average value of all items emitted by this blocking_observable.
369 
370         \sample
371         When the source observable emits at least one item:
372         \snippet blocking_observable.cpp blocking average sample
373         \snippet output.txt blocking average sample
374 
375         When the source observable is empty:
376         \snippet blocking_observable.cpp blocking average empty sample
377         \snippet output.txt blocking average empty sample
378 
379         When the source observable calls on_error:
380         \snippet blocking_observable.cpp blocking average error sample
381         \snippet output.txt blocking average error sample
382     */
average() const383     double average() const {
384         return source.average().as_blocking().last();
385     }
386 
387     /*! Return the max of all items emitted by this blocking_observable, or throw an std::runtime_error exception if it emits no items.
388 
389     \return  The max of all items emitted by this blocking_observable.
390 
391     \sample
392     When the source observable emits at least one item:
393     \snippet blocking_observable.cpp blocking max sample
394     \snippet output.txt blocking max sample
395 
396     When the source observable is empty:
397     \snippet blocking_observable.cpp blocking max empty sample
398     \snippet output.txt blocking max empty sample
399 
400     When the source observable calls on_error:
401     \snippet blocking_observable.cpp blocking max error sample
402     \snippet output.txt blocking max error sample
403 */
max() const404     T max() const {
405         return source.max().as_blocking().last();
406     }
407 
408     /*! Return the min of all items emitted by this blocking_observable, or throw an std::runtime_error exception if it emits no items.
409 
410     \return  The min of all items emitted by this blocking_observable.
411 
412     \sample
413     When the source observable emits at least one item:
414     \snippet blocking_observable.cpp blocking min sample
415     \snippet output.txt blocking min sample
416 
417     When the source observable is empty:
418     \snippet blocking_observable.cpp blocking min empty sample
419     \snippet output.txt blocking min empty sample
420 
421     When the source observable calls on_error:
422     \snippet blocking_observable.cpp blocking min error sample
423     \snippet output.txt blocking min error sample
424 */
min() const425     T min() const {
426         return source.min().as_blocking().last();
427     }
428 };
429 
430 namespace detail {
431 
432 template<class SourceOperator, class Subscriber>
433 struct safe_subscriber
434 {
safe_subscriberrxcpp::detail::safe_subscriber435     safe_subscriber(SourceOperator& so, Subscriber& o) : so(std::addressof(so)), o(std::addressof(o)) {}
436 
subscriberxcpp::detail::safe_subscriber437     void subscribe() {
438         RXCPP_TRY {
439             so->on_subscribe(*o);
440         } RXCPP_CATCH(...) {
441             if (!o->is_subscribed()) {
442               rxu::rethrow_current_exception();
443             }
444             o->on_error(rxu::make_error_ptr(rxu::current_exception()));
445             o->unsubscribe();
446         }
447     }
448 
operator ()rxcpp::detail::safe_subscriber449     void operator()(const rxsc::schedulable&) {
450         subscribe();
451     }
452 
453     SourceOperator* so;
454     Subscriber* o;
455 };
456 
457 }
458 
459 template<>
460 class observable<void, void>;
461 
462 /*!
463     \defgroup group-observable Observables
464 
465     \brief These are the set of observable classes in rxcpp.
466 
467     \class rxcpp::observable
468 
469     \ingroup group-observable group-core
470 
471     \brief a source of values. subscribe or use one of the operator methods that return a new observable, which uses this observable as a source.
472 
473     \par Some code
474     This sample will observable::subscribe() to values from a observable<void, void>::range().
475 
476     \sample
477     \snippet range.cpp range sample
478     \snippet output.txt range sample
479 
480 */
481 template<class T, class SourceOperator>
482 class observable
483     : public observable_base<T>
484 {
485     static_assert(std::is_same<T, typename SourceOperator::value_type>::value, "SourceOperator::value_type must be the same as T in observable<T, SourceOperator>");
486 
487     typedef observable<T, SourceOperator> this_type;
488 
489 public:
490     typedef rxu::decay_t<SourceOperator> source_operator_type;
491     mutable source_operator_type source_operator;
492 
493 private:
494 
495     template<class U, class SO>
496     friend class observable;
497 
498     template<class U, class SO>
499     friend bool operator==(const observable<U, SO>&, const observable<U, SO>&);
500 
501     template<class Subscriber>
detail_subscribe(Subscriber o) const502     auto detail_subscribe(Subscriber o) const
503         -> composite_subscription {
504 
505         typedef rxu::decay_t<Subscriber> subscriber_type;
506 
507         static_assert(is_subscriber<subscriber_type>::value, "subscribe must be passed a subscriber");
508         static_assert(std::is_same<typename source_operator_type::value_type, T>::value && std::is_convertible<T*, typename subscriber_type::value_type*>::value, "the value types in the sequence must match or be convertible");
509         static_assert(detail::has_on_subscribe_for<subscriber_type, source_operator_type>::value, "inner must have on_subscribe method that accepts this subscriber ");
510 
511         trace_activity().subscribe_enter(*this, o);
512 
513         if (!o.is_subscribed()) {
514             trace_activity().subscribe_return(*this);
515             return o.get_subscription();
516         }
517 
518         detail::safe_subscriber<source_operator_type, subscriber_type> subscriber(source_operator, o);
519 
520         // make sure to let current_thread take ownership of the thread as early as possible.
521         if (rxsc::current_thread::is_schedule_required()) {
522             const auto& sc = rxsc::make_current_thread();
523             sc.create_worker(o.get_subscription()).schedule(subscriber);
524         } else {
525             // current_thread already owns this thread.
526             subscriber.subscribe();
527         }
528 
529         trace_activity().subscribe_return(*this);
530         return o.get_subscription();
531     }
532 
533 public:
534     typedef T value_type;
535 
536     static_assert(rxo::is_operator<source_operator_type>::value || rxs::is_source<source_operator_type>::value, "observable must wrap an operator or source");
537 
~observable()538     ~observable()
539     {
540     }
541 
observable()542     observable()
543     {
544     }
545 
observable(const source_operator_type & o)546     explicit observable(const source_operator_type& o)
547         : source_operator(o)
548     {
549     }
observable(source_operator_type && o)550     explicit observable(source_operator_type&& o)
551         : source_operator(std::move(o))
552     {
553     }
554 
555     /// implicit conversion between observables of the same value_type
556     template<class SO>
observable(const observable<T,SO> & o)557     observable(const observable<T, SO>& o)
558         : source_operator(o.source_operator)
559     {}
560     /// implicit conversion between observables of the same value_type
561     template<class SO>
observable(observable<T,SO> && o)562     observable(observable<T, SO>&& o)
563         : source_operator(std::move(o.source_operator))
564     {}
565 
566 #if 0
567     template<class I>
568     void on_subscribe(observer<T, I> o) const {
569         source_operator.on_subscribe(o);
570     }
571 #endif
572 
573     /*! @copydoc rxcpp::operators::as_dynamic
574      */
575     template<class... AN>
as_dynamic(AN ** ...) const576     observable<T> as_dynamic(AN**...) const {
577         return *this;
578         static_assert(sizeof...(AN) == 0, "as_dynamic() was passed too many arguments.");
579     }
580 
581     /*! @copydoc rx-ref_count.hpp
582      */
583     template<class... AN>
ref_count(AN...an) const584     auto ref_count(AN... an) const // ref_count(ConnectableObservable&&)
585         /// \cond SHOW_SERVICE_MEMBERS
586         -> decltype(observable_member(ref_count_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
587         /// \endcond
588     {
589         return      observable_member(ref_count_tag{},                *this, std::forward<AN>(an)...);
590     }
591 
592     /*! @copydoc rxcpp::operators::as_blocking
593      */
594     template<class... AN>
as_blocking(AN ** ...) const595     blocking_observable<T, this_type> as_blocking(AN**...) const {
596         return blocking_observable<T, this_type>(*this);
597         static_assert(sizeof...(AN) == 0, "as_blocking() was passed too many arguments.");
598     }
599 
600     /// \cond SHOW_SERVICE_MEMBERS
601 
602     ///
603     /// takes any function that will take this observable and produce a result value.
604     /// this is intended to allow externally defined operators, that use subscribe,
605     /// to be connected into the expression.
606     ///
607     template<class OperatorFactory>
op(OperatorFactory && of) const608     auto op(OperatorFactory&& of) const
609         -> decltype(of(*(const this_type*)nullptr)) {
610         return      of(*this);
611         static_assert(is_operator_factory_for<this_type, OperatorFactory>::value, "Function passed for op() must have the signature Result(SourceObservable)");
612     }
613 
614     /*! @copydoc rx-lift.hpp
615      */
616     template<class ResultType, class Operator>
lift(Operator && op) const617     auto lift(Operator&& op) const
618         ->      observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>> {
619         return  observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>(
620                                                                                                                       rxo::detail::lift_operator<ResultType, source_operator_type, Operator>(source_operator, std::forward<Operator>(op)));
621         static_assert(detail::is_lift_function_for<T, subscriber<ResultType>, Operator>::value, "Function passed for lift() must have the signature subscriber<...>(subscriber<T, ...>)");
622     }
623 
624     ///
625     /// takes any function that will take a subscriber for this observable and produce a subscriber.
626     /// this is intended to allow externally defined operators, that use make_subscriber, to be connected
627     /// into the expression.
628     ///
629     template<class ResultType, class Operator>
lift_if(Operator && op) const630     auto lift_if(Operator&& op) const
631         -> typename std::enable_if<detail::is_lift_function_for<T, subscriber<ResultType>, Operator>::value,
632             observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>>::type {
633         return  observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>(
634                                                                                                                       rxo::detail::lift_operator<ResultType, source_operator_type, Operator>(source_operator, std::forward<Operator>(op)));
635     }
636     ///
637     /// takes any function that will take a subscriber for this observable and produce a subscriber.
638     /// this is intended to allow externally defined operators, that use make_subscriber, to be connected
639     /// into the expression.
640     ///
641     template<class ResultType, class Operator>
lift_if(Operator &&) const642     auto lift_if(Operator&&) const
643         -> typename std::enable_if<!detail::is_lift_function_for<T, subscriber<ResultType>, Operator>::value,
644             decltype(rxs::from<ResultType>())>::type {
645         return       rxs::from<ResultType>();
646     }
647     /// \endcond
648 
649     /*! @copydoc rx-subscribe.hpp
650      */
651     template<class... ArgN>
subscribe(ArgN &&...an) const652     auto subscribe(ArgN&&... an) const
653         -> composite_subscription {
654         return detail_subscribe(make_subscriber<T>(std::forward<ArgN>(an)...));
655     }
656 
657     /*! @copydoc rx-all.hpp
658      */
659     template<class... AN>
all(AN &&...an) const660     auto all(AN&&... an) const
661     /// \cond SHOW_SERVICE_MEMBERS
662     -> decltype(observable_member(all_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
663     /// \endcond
664     {
665         return  observable_member(all_tag{},                *this, std::forward<AN>(an)...);
666     }
667 
668     /*! @copydoc rxcpp::operators::is_empty
669      */
670     template<class... AN>
is_empty(AN &&...an) const671     auto is_empty(AN&&... an) const
672     /// \cond SHOW_SERVICE_MEMBERS
673     -> decltype(observable_member(is_empty_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
674     /// \endcond
675     {
676         return  observable_member(is_empty_tag{},                *this, std::forward<AN>(an)...);
677     }
678 
679     /*! @copydoc rx-any.hpp
680      */
681     template<class... AN>
any(AN &&...an) const682     auto any(AN&&... an) const
683     /// \cond SHOW_SERVICE_MEMBERS
684     -> decltype(observable_member(any_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
685     /// \endcond
686     {
687         return  observable_member(any_tag{},                *this, std::forward<AN>(an)...);
688     }
689 
690     /*! @copydoc rxcpp::operators::exists
691      */
692     template<class... AN>
exists(AN &&...an) const693     auto exists(AN&&... an) const
694     /// \cond SHOW_SERVICE_MEMBERS
695     -> decltype(observable_member(exists_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
696     /// \endcond
697     {
698         return  observable_member(exists_tag{},                *this, std::forward<AN>(an)...);
699     }
700 
701     /*! @copydoc rxcpp::operators::contains
702      */
703     template<class... AN>
contains(AN &&...an) const704     auto contains(AN&&... an) const
705     /// \cond SHOW_SERVICE_MEMBERS
706     -> decltype(observable_member(contains_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
707     /// \endcond
708     {
709         return  observable_member(contains_tag{},                *this, std::forward<AN>(an)...);
710     }
711 
712     /*! @copydoc rx-filter.hpp
713      */
714     template<class... AN>
filter(AN &&...an) const715     auto filter(AN&&... an) const
716     /// \cond SHOW_SERVICE_MEMBERS
717     -> decltype(observable_member(filter_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
718     /// \endcond
719     {
720         return  observable_member(filter_tag{}, *this,                std::forward<AN>(an)...);
721     }
722 
723     /*! @copydoc rx-switch_if_empty.hpp
724     */
725     template<class... AN>
switch_if_empty(AN &&...an) const726     auto switch_if_empty(AN&&... an) const
727         /// \cond SHOW_SERVICE_MEMBERS
728         -> decltype(observable_member(switch_if_empty_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
729         /// \endcond
730     {
731         return      observable_member(switch_if_empty_tag{},                *this, std::forward<AN>(an)...);
732     }
733 
734     /*! @copydoc rxcpp::operators::default_if_empty
735     */
736     template<class... AN>
default_if_empty(AN &&...an) const737     auto default_if_empty(AN&&... an) const
738         /// \cond SHOW_SERVICE_MEMBERS
739         -> decltype(observable_member(default_if_empty_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
740         /// \endcond
741     {
742         return      observable_member(default_if_empty_tag{},                *this, std::forward<AN>(an)...);
743     }
744 
745     /*! @copydoc rx-sequence_equal.hpp
746      */
747     template<class... AN>
sequence_equal(AN...an) const748     auto sequence_equal(AN... an) const
749         /// \cond SHOW_SERVICE_MEMBERS
750         -> decltype(observable_member(sequence_equal_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
751         /// \endcond
752     {
753         return      observable_member(sequence_equal_tag{},                *this, std::forward<AN>(an)...);
754     }
755 
756     /*! @copydoc rx-tap.hpp
757      */
758     template<class... AN>
tap(AN &&...an) const759     auto tap(AN&&... an) const
760         /// \cond SHOW_SERVICE_MEMBERS
761         -> decltype(observable_member(tap_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
762         /// \endcond
763     {
764         return      observable_member(tap_tag{},                *this, std::forward<AN>(an)...);
765     }
766 
767     /*! @copydoc rx-time_interval.hpp
768      */
769     template<class... AN>
time_interval(AN &&...an) const770     auto time_interval(AN&&... an) const
771     /// \cond SHOW_SERVICE_MEMBERS
772     -> decltype(observable_member(time_interval_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
773     /// \endcond
774     {
775         return  observable_member(time_interval_tag{},                *this, std::forward<AN>(an)...);
776     }
777 
778     /*! @copydoc rx-timeout.hpp
779      */
780     template<class... AN>
timeout(AN &&...an) const781     auto timeout(AN&&... an) const
782     /// \cond SHOW_SERVICE_MEMBERS
783     -> decltype(observable_member(timeout_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
784     /// \endcond
785     {
786         return  observable_member(timeout_tag{},                *this, std::forward<AN>(an)...);
787     }
788 
789     /*! @copydoc rx-timestamp.hpp
790      */
791     template<class... AN>
timestamp(AN &&...an) const792     auto timestamp(AN&&... an) const
793     /// \cond SHOW_SERVICE_MEMBERS
794     -> decltype(observable_member(timestamp_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
795     /// \endcond
796     {
797         return  observable_member(timestamp_tag{},                *this, std::forward<AN>(an)...);
798     }
799 
800     /*! @copydoc rx-finally.hpp
801      */
802     template<class... AN>
finally(AN &&...an) const803     auto finally(AN&&... an) const
804         /// \cond SHOW_SERVICE_MEMBERS
805         -> decltype(observable_member(finally_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
806         /// \endcond
807     {
808         return      observable_member(finally_tag{},                *this, std::forward<AN>(an)...);
809     }
810 
811     /*! @copydoc rx-on_error_resume_next.hpp
812     */
813     template<class... AN>
on_error_resume_next(AN &&...an) const814     auto on_error_resume_next(AN&&... an) const
815     /// \cond SHOW_SERVICE_MEMBERS
816     -> decltype(observable_member(on_error_resume_next_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
817     /// \endcond
818     {
819         return  observable_member(on_error_resume_next_tag{},                *this, std::forward<AN>(an)...);
820     }
821 
822     /*! @copydoc rx-on_error_resume_next.hpp
823     */
824     template<class... AN>
switch_on_error(AN &&...an) const825     auto switch_on_error(AN&&... an) const
826     /// \cond SHOW_SERVICE_MEMBERS
827     -> decltype(observable_member(on_error_resume_next_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
828     /// \endcond
829     {
830         return  observable_member(on_error_resume_next_tag{},                *this, std::forward<AN>(an)...);
831     }
832 
833     /*! @copydoc rx-map.hpp
834      */
835     template<class... AN>
map(AN &&...an) const836     auto map(AN&&... an) const
837     /// \cond SHOW_SERVICE_MEMBERS
838     -> decltype(observable_member(map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
839     /// \endcond
840     {
841         return  observable_member(map_tag{},                *this, std::forward<AN>(an)...);
842     }
843 
844     /*! @copydoc rx-map.hpp
845      */
846     template<class... AN>
transform(AN &&...an) const847     auto transform(AN&&... an) const
848     /// \cond SHOW_SERVICE_MEMBERS
849     -> decltype(observable_member(map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
850     /// \endcond
851     {
852         return  observable_member(map_tag{},                *this, std::forward<AN>(an)...);
853     }
854 
855     /*! @copydoc rx-debounce.hpp
856      */
857     template<class... AN>
debounce(AN &&...an) const858     auto debounce(AN&&... an) const
859     /// \cond SHOW_SERVICE_MEMBERS
860     -> decltype(observable_member(debounce_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
861     /// \endcond
862     {
863         return  observable_member(debounce_tag{},                *this, std::forward<AN>(an)...);
864     }
865 
866     /*! @copydoc rx-delay.hpp
867      */
868     template<class... AN>
delay(AN &&...an) const869     auto delay(AN&&... an) const
870     /// \cond SHOW_SERVICE_MEMBERS
871     -> decltype(observable_member(delay_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
872     /// \endcond
873     {
874         return  observable_member(delay_tag{},                *this, std::forward<AN>(an)...);
875     }
876 
877     /*! @copydoc rx-distinct.hpp
878      */
879     template<class... AN>
distinct(AN &&...an) const880     auto distinct(AN&&... an) const
881     /// \cond SHOW_SERVICE_MEMBERS
882     -> decltype(observable_member(distinct_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
883     /// \endcond
884     {
885         return  observable_member(distinct_tag{},                *this, std::forward<AN>(an)...);
886     }
887 
888     /*! @copydoc rx-distinct_until_changed.hpp
889      */
890     template<class... AN>
distinct_until_changed(AN &&...an) const891     auto distinct_until_changed(AN&&... an) const
892     /// \cond SHOW_SERVICE_MEMBERS
893     -> decltype(observable_member(distinct_until_changed_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
894     /// \endcond
895     {
896         return  observable_member(distinct_until_changed_tag{},                *this, std::forward<AN>(an)...);
897     }
898 
899     /*! @copydoc rx-element_at.hpp
900      */
901     template<class... AN>
element_at(AN &&...an) const902     auto element_at(AN&&... an) const
903     /// \cond SHOW_SERVICE_MEMBERS
904     -> decltype(observable_member(element_at_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
905     /// \endcond
906     {
907         return  observable_member(element_at_tag{},                *this, std::forward<AN>(an)...);
908     }
909 
910     /*! @copydoc rx-window.hpp
911     */
912     template<class... AN>
window(AN &&...an) const913     auto window(AN&&... an) const
914     /// \cond SHOW_SERVICE_MEMBERS
915     -> decltype(observable_member(window_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
916     /// \endcond
917     {
918         return  observable_member(window_tag{},                *this, std::forward<AN>(an)...);
919     }
920 
921     /*! @copydoc rx-window_time.hpp
922     */
923     template<class... AN>
window_with_time(AN &&...an) const924     auto window_with_time(AN&&... an) const
925     /// \cond SHOW_SERVICE_MEMBERS
926     -> decltype(observable_member(window_with_time_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
927     /// \endcond
928     {
929         return  observable_member(window_with_time_tag{},                *this, std::forward<AN>(an)...);
930     }
931 
932     /*! @copydoc rx-window_time_count.hpp
933     */
934     template<class... AN>
window_with_time_or_count(AN &&...an) const935     auto window_with_time_or_count(AN&&... an) const
936     /// \cond SHOW_SERVICE_MEMBERS
937     -> decltype(observable_member(window_with_time_or_count_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
938     /// \endcond
939     {
940         return  observable_member(window_with_time_or_count_tag{},                *this, std::forward<AN>(an)...);
941     }
942 
943     /*! @copydoc rx-window_toggle.hpp
944     */
945     template<class... AN>
window_toggle(AN &&...an) const946     auto window_toggle(AN&&... an) const
947     /// \cond SHOW_SERVICE_MEMBERS
948     -> decltype(observable_member(window_toggle_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
949     /// \endcond
950     {
951         return  observable_member(window_toggle_tag{},                *this, std::forward<AN>(an)...);
952     }
953 
954     /*! @copydoc rx-buffer_count.hpp
955      */
956     template<class... AN>
buffer(AN &&...an) const957     auto buffer(AN&&... an) const
958     /// \cond SHOW_SERVICE_MEMBERS
959     -> decltype(observable_member(buffer_count_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
960     /// \endcond
961     {
962         return  observable_member(buffer_count_tag{},                *this, std::forward<AN>(an)...);
963     }
964 
965     /*! @copydoc rx-buffer_time.hpp
966      */
967     template<class... AN>
buffer_with_time(AN &&...an) const968     auto buffer_with_time(AN&&... an) const
969     /// \cond SHOW_SERVICE_MEMBERS
970     -> decltype(observable_member(buffer_with_time_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
971     /// \endcond
972     {
973         return  observable_member(buffer_with_time_tag{},                *this, std::forward<AN>(an)...);
974     }
975 
976     /*! @copydoc rx-buffer_time_count.hpp
977      */
978     template<class... AN>
buffer_with_time_or_count(AN &&...an) const979     auto buffer_with_time_or_count(AN&&... an) const
980     /// \cond SHOW_SERVICE_MEMBERS
981     -> decltype(observable_member(buffer_with_time_or_count_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
982     /// \endcond
983     {
984         return  observable_member(buffer_with_time_or_count_tag{},                *this, std::forward<AN>(an)...);
985     }
986 
987     /*! @copydoc rx-switch_on_next.hpp
988     */
989     template<class... AN>
switch_on_next(AN &&...an) const990     auto switch_on_next(AN&&... an) const
991         /// \cond SHOW_SERVICE_MEMBERS
992         -> decltype(observable_member(switch_on_next_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
993         /// \endcond
994     {
995         return      observable_member(switch_on_next_tag{},                *this, std::forward<AN>(an)...);
996     }
997 
998     /*! @copydoc rx-merge.hpp
999      */
1000     template<class... AN>
merge(AN...an) const1001     auto merge(AN... an) const
1002         /// \cond SHOW_SERVICE_MEMBERS
1003         -> decltype(observable_member(merge_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1004         /// \endcond
1005     {
1006         return      observable_member(merge_tag{},                *this, std::forward<AN>(an)...);
1007     }
1008 
1009     /*! @copydoc rx-merge_delay_error.hpp
1010      */
1011     template<class... AN>
merge_delay_error(AN...an) const1012     auto merge_delay_error(AN... an) const
1013         /// \cond SHOW_SERVICE_MEMBERS
1014         -> decltype(observable_member(merge_delay_error_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1015         /// \endcond
1016     {
1017             return      observable_member(merge_delay_error_tag{},                *this, std::forward<AN>(an)...);
1018     }
1019 
1020     /*! @copydoc rx-amb.hpp
1021      */
1022     template<class... AN>
amb(AN...an) const1023     auto amb(AN... an) const
1024         /// \cond SHOW_SERVICE_MEMBERS
1025         -> decltype(observable_member(amb_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1026         /// \endcond
1027     {
1028         return      observable_member(amb_tag{},                *this, std::forward<AN>(an)...);
1029     }
1030 
1031     /*! @copydoc rx-flat_map.hpp
1032      */
1033     template<class... AN>
flat_map(AN &&...an) const1034     auto flat_map(AN&&... an) const
1035         /// \cond SHOW_SERVICE_MEMBERS
1036         -> decltype(observable_member(flat_map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1037         /// \endcond
1038     {
1039         return      observable_member(flat_map_tag{},                *this, std::forward<AN>(an)...);
1040     }
1041 
1042     /*! @copydoc rx-flat_map.hpp
1043      */
1044     template<class... AN>
merge_transform(AN &&...an) const1045     auto merge_transform(AN&&... an) const
1046         /// \cond SHOW_SERVICE_MEMBERS
1047         -> decltype(observable_member(flat_map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1048         /// \endcond
1049     {
1050         return      observable_member(flat_map_tag{},                *this, std::forward<AN>(an)...);
1051     }
1052 
1053     /*! @copydoc rx-concat.hpp
1054      */
1055     template<class... AN>
concat(AN...an) const1056     auto concat(AN... an) const
1057         /// \cond SHOW_SERVICE_MEMBERS
1058         -> decltype(observable_member(concat_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1059         /// \endcond
1060     {
1061         return      observable_member(concat_tag{},                *this, std::forward<AN>(an)...);
1062     }
1063 
1064     /*! @copydoc rx-concat_map.hpp
1065      */
1066     template<class... AN>
concat_map(AN &&...an) const1067     auto concat_map(AN&&... an) const
1068         /// \cond SHOW_SERVICE_MEMBERS
1069         -> decltype(observable_member(concat_map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1070         /// \endcond
1071     {
1072         return      observable_member(concat_map_tag{},                *this, std::forward<AN>(an)...);
1073     }
1074 
1075     /*! @copydoc rx-concat_map.hpp
1076      */
1077     template<class... AN>
concat_transform(AN &&...an) const1078     auto concat_transform(AN&&... an) const
1079         /// \cond SHOW_SERVICE_MEMBERS
1080         -> decltype(observable_member(concat_map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1081         /// \endcond
1082     {
1083         return      observable_member(concat_map_tag{},                *this, std::forward<AN>(an)...);
1084     }
1085 
1086     /*! @copydoc rx-with_latest_from.hpp
1087      */
1088     template<class... AN>
with_latest_from(AN...an) const1089     auto with_latest_from(AN... an) const
1090         /// \cond SHOW_SERVICE_MEMBERS
1091         -> decltype(observable_member(with_latest_from_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1092         /// \endcond
1093     {
1094         return      observable_member(with_latest_from_tag{},                *this, std::forward<AN>(an)...);
1095     }
1096 
1097 
1098     /*! @copydoc rx-combine_latest.hpp
1099      */
1100     template<class... AN>
combine_latest(AN...an) const1101     auto combine_latest(AN... an) const
1102         /// \cond SHOW_SERVICE_MEMBERS
1103         -> decltype(observable_member(combine_latest_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1104         /// \endcond
1105     {
1106         return      observable_member(combine_latest_tag{},                *this, std::forward<AN>(an)...);
1107     }
1108 
1109     /*! @copydoc rx-zip.hpp
1110      */
1111     template<class... AN>
zip(AN &&...an) const1112     auto zip(AN&&... an) const
1113         /// \cond SHOW_SERVICE_MEMBERS
1114         -> decltype(observable_member(zip_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1115         /// \endcond
1116     {
1117         return      observable_member(zip_tag{},                *this, std::forward<AN>(an)...);
1118     }
1119 
1120     /*! @copydoc rx-group_by.hpp
1121      */
1122     template<class... AN>
group_by(AN &&...an) const1123     inline auto group_by(AN&&... an) const
1124         /// \cond SHOW_SERVICE_MEMBERS
1125         -> decltype(observable_member(group_by_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1126         /// \endcond
1127     {
1128         return      observable_member(group_by_tag{},                *this, std::forward<AN>(an)...);
1129     }
1130 
1131     /*! @copydoc rx-ignore_elements.hpp
1132      */
1133     template<class... AN>
ignore_elements(AN &&...an) const1134     auto ignore_elements(AN&&... an) const
1135     /// \cond SHOW_SERVICE_MEMBERS
1136     -> decltype(observable_member(ignore_elements_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1137     /// \endcond
1138     {
1139         return  observable_member(ignore_elements_tag{},                *this, std::forward<AN>(an)...);
1140     }
1141 
1142     /*! @copydoc rx-muticast.hpp
1143      */
1144     template<class... AN>
multicast(AN &&...an) const1145     auto multicast(AN&&... an) const
1146         /// \cond SHOW_SERVICE_MEMBERS
1147         -> decltype(observable_member(multicast_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1148         /// \endcond
1149     {
1150         return      observable_member(multicast_tag{},                *this, std::forward<AN>(an)...);
1151     }
1152 
1153     /*! @copydoc rx-publish.hpp
1154      */
1155     template<class... AN>
publish(AN &&...an) const1156     auto publish(AN&&... an) const
1157         /// \cond SHOW_SERVICE_MEMBERS
1158         -> decltype(observable_member(publish_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1159         /// \endcond
1160     {
1161         return      observable_member(publish_tag{},                *this, std::forward<AN>(an)...);
1162     }
1163 
1164     /*! @copydoc rxcpp::operators::publish_synchronized
1165      */
1166     template<class... AN>
publish_synchronized(AN &&...an) const1167     auto publish_synchronized(AN&&... an) const
1168         /// \cond SHOW_SERVICE_MEMBERS
1169         -> decltype(observable_member(publish_synchronized_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1170         /// \endcond
1171     {
1172         return      observable_member(publish_synchronized_tag{},                *this, std::forward<AN>(an)...);
1173     }
1174 
1175     /*! @copydoc rx-replay.hpp
1176      */
1177     template<class... AN>
replay(AN &&...an) const1178     auto replay(AN&&... an) const
1179         /// \cond SHOW_SERVICE_MEMBERS
1180         -> decltype(observable_member(replay_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1181         /// \endcond
1182     {
1183         return      observable_member(replay_tag{},                *this, std::forward<AN>(an)...);
1184     }
1185 
1186     /*! @copydoc rx-subscribe_on.hpp
1187      */
1188     template<class... AN>
subscribe_on(AN &&...an) const1189     auto subscribe_on(AN&&... an) const
1190         /// \cond SHOW_SERVICE_MEMBERS
1191         -> decltype(observable_member(subscribe_on_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1192         /// \endcond
1193     {
1194         return      observable_member(subscribe_on_tag{},                *this, std::forward<AN>(an)...);
1195     }
1196 
1197     /*! @copydoc rx-observe_on.hpp
1198     */
1199     template<class... AN>
observe_on(AN &&...an) const1200     auto observe_on(AN&&... an) const
1201         /// \cond SHOW_SERVICE_MEMBERS
1202         -> decltype(observable_member(observe_on_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1203         /// \endcond
1204     {
1205         return      observable_member(observe_on_tag{},                *this, std::forward<AN>(an)...);
1206     }
1207 
1208     /*! @copydoc rx-reduce.hpp
1209      */
1210     template<class... AN>
reduce(AN &&...an) const1211     auto reduce(AN&&... an) const
1212         /// \cond SHOW_SERVICE_MEMBERS
1213         -> decltype(observable_member(reduce_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1214         /// \endcond
1215     {
1216         return      observable_member(reduce_tag{},                *this, std::forward<AN>(an)...);
1217     }
1218 
1219     /*! @copydoc rx-reduce.hpp
1220      */
1221     template<class... AN>
accumulate(AN &&...an) const1222     auto accumulate(AN&&... an) const
1223         /// \cond SHOW_SERVICE_MEMBERS
1224         -> decltype(observable_member(reduce_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1225         /// \endcond
1226     {
1227         return      observable_member(reduce_tag{},                *this, std::forward<AN>(an)...);
1228     }
1229 
1230     /*! @copydoc rxcpp::operators::first
1231      */
1232     template<class... AN>
first(AN ** ...) const1233     auto first(AN**...) const
1234         /// \cond SHOW_SERVICE_MEMBERS
1235         -> decltype(observable_member(delayed_type<first_tag, AN...>::value(), *(this_type*)nullptr))
1236         /// \endcond
1237     {
1238         return      observable_member(delayed_type<first_tag, AN...>::value(),                *this);
1239         static_assert(sizeof...(AN) == 0, "first() was passed too many arguments.");
1240     }
1241 
1242     /*! @copydoc rxcpp::operators::last
1243      */
1244     template<class... AN>
last(AN ** ...) const1245     auto last(AN**...) const
1246         /// \cond SHOW_SERVICE_MEMBERS
1247         -> decltype(observable_member(delayed_type<last_tag, AN...>::value(), *(this_type*)nullptr))
1248         /// \endcond
1249     {
1250         return      observable_member(delayed_type<last_tag, AN...>::value(),                *this);
1251         static_assert(sizeof...(AN) == 0, "last() was passed too many arguments.");
1252     }
1253 
1254     /*! @copydoc rxcpp::operators::count
1255      */
1256     template<class... AN>
count(AN ** ...) const1257     auto count(AN**...) const
1258         /// \cond SHOW_SERVICE_MEMBERS
1259         -> decltype(observable_member(delayed_type<reduce_tag, AN...>::value(), *(this_type*)nullptr, 0, rxu::count(), identity_for<int>()))
1260         /// \endcond
1261     {
1262         return      observable_member(delayed_type<reduce_tag, AN...>::value(),                *this, 0, rxu::count(), identity_for<int>());
1263         static_assert(sizeof...(AN) == 0, "count() was passed too many arguments.");
1264     }
1265 
1266     /*! @copydoc rxcpp::operators::sum
1267      */
1268     template<class... AN>
sum(AN ** ...) const1269     auto sum(AN**...) const
1270         /// \cond SHOW_SERVICE_MEMBERS
1271         -> decltype(observable_member(delayed_type<sum_tag, AN...>::value(), *(this_type*)nullptr))
1272         /// \endcond
1273     {
1274         return      observable_member(delayed_type<sum_tag, AN...>::value(),                *this);
1275         static_assert(sizeof...(AN) == 0, "sum() was passed too many arguments.");
1276     }
1277 
1278     /*! @copydoc rxcpp::operators::average
1279      */
1280     template<class... AN>
average(AN ** ...) const1281     auto average(AN**...) const
1282         /// \cond SHOW_SERVICE_MEMBERS
1283         -> decltype(observable_member(delayed_type<average_tag, AN...>::value(), *(this_type*)nullptr))
1284         /// \endcond
1285     {
1286         return      observable_member(delayed_type<average_tag, AN...>::value(),                *this);
1287         static_assert(sizeof...(AN) == 0, "average() was passed too many arguments.");
1288     }
1289 
1290     /*! @copydoc rxcpp::operators::max
1291      */
1292     template<class... AN>
max(AN ** ...) const1293     auto max(AN**...) const
1294         /// \cond SHOW_SERVICE_MEMBERS
1295         -> decltype(observable_member(delayed_type<max_tag, AN...>::value(), *(this_type*)nullptr))
1296         /// \endcond
1297     {
1298         return      observable_member(delayed_type<max_tag, AN...>::value(),                *this);
1299         static_assert(sizeof...(AN) == 0, "max() was passed too many arguments.");
1300     }
1301 
1302     /*! @copydoc rxcpp::operators::min
1303      */
1304     template<class... AN>
min(AN ** ...) const1305     auto min(AN**...) const
1306         /// \cond SHOW_SERVICE_MEMBERS
1307         -> decltype(observable_member(delayed_type<min_tag, AN...>::value(), *(this_type*)nullptr))
1308         /// \endcond
1309     {
1310         return      observable_member(delayed_type<min_tag, AN...>::value(),                *this);
1311         static_assert(sizeof...(AN) == 0, "min() was passed too many arguments.");
1312     }
1313 
1314     /*! @copydoc rx-scan.hpp
1315     */
1316     template<class... AN>
scan(AN...an) const1317     auto scan(AN... an) const
1318         /// \cond SHOW_SERVICE_MEMBERS
1319         -> decltype(observable_member(scan_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1320         /// \endcond
1321     {
1322         return      observable_member(scan_tag{},                *this, std::forward<AN>(an)...);
1323     }
1324 
1325     /*! @copydoc rx-sample_time.hpp
1326      */
1327     template<class... AN>
sample_with_time(AN &&...an) const1328     auto sample_with_time(AN&&... an) const
1329         /// \cond SHOW_SERVICE_MEMBERS
1330         -> decltype(observable_member(sample_with_time_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1331         /// \endcond
1332     {
1333         return      observable_member(sample_with_time_tag{},                *this, std::forward<AN>(an)...);
1334     }
1335 
1336     /*! @copydoc rx-skip.hpp
1337     */
1338     template<class... AN>
skip(AN...an) const1339     auto skip(AN... an) const
1340         /// \cond SHOW_SERVICE_MEMBERS
1341         -> decltype(observable_member(skip_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1342         /// \endcond
1343     {
1344         return      observable_member(skip_tag{},                *this, std::forward<AN>(an)...);
1345     }
1346 
1347     /*! @copydoc rx-skip.hpp
1348      */
1349     template<class... AN>
skip_while(AN...an) const1350     auto skip_while(AN... an) const
1351         /// \cond SHOW_SERVICE_MEMBERS
1352         -> decltype(observable_member(skip_while_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1353         /// \endcond
1354     {
1355         return      observable_member(skip_while_tag{},                *this, std::forward<AN>(an)...);
1356     }
1357 
1358     /*! @copydoc rx-skip_last.hpp
1359     */
1360     template<class... AN>
skip_last(AN...an) const1361     auto skip_last(AN... an) const
1362         /// \cond SHOW_SERVICE_MEMBERS
1363         -> decltype(observable_member(skip_last_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1364         /// \endcond
1365     {
1366         return      observable_member(skip_last_tag{},                *this, std::forward<AN>(an)...);
1367     }
1368 
1369     /*! @copydoc rx-skip_until.hpp
1370     */
1371     template<class... AN>
skip_until(AN...an) const1372     auto skip_until(AN... an) const
1373         /// \cond SHOW_SERVICE_MEMBERS
1374         -> decltype(observable_member(skip_until_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1375         /// \endcond
1376     {
1377         return      observable_member(skip_until_tag{},                *this, std::forward<AN>(an)...);
1378     }
1379 
1380     /*! @copydoc rx-take.hpp
1381      */
1382     template<class... AN>
take(AN...an) const1383     auto take(AN... an) const
1384         /// \cond SHOW_SERVICE_MEMBERS
1385         -> decltype(observable_member(take_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1386         /// \endcond
1387     {
1388         return      observable_member(take_tag{},                *this, std::forward<AN>(an)...);
1389     }
1390 
1391     /*! @copydoc rx-take_last.hpp
1392     */
1393     template<class... AN>
take_last(AN &&...an) const1394     auto take_last(AN&&... an) const
1395         /// \cond SHOW_SERVICE_MEMBERS
1396         -> decltype(observable_member(take_last_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1397         /// \endcond
1398     {
1399         return      observable_member(take_last_tag{},                *this, std::forward<AN>(an)...);
1400     }
1401 
1402     /*! @copydoc rx-take_until.hpp
1403     */
1404     template<class... AN>
take_until(AN &&...an) const1405     auto take_until(AN&&... an) const
1406         /// \cond SHOW_SERVICE_MEMBERS
1407         -> decltype(observable_member(take_until_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1408         /// \endcond
1409     {
1410         return      observable_member(take_until_tag{},                *this, std::forward<AN>(an)...);
1411     }
1412 
1413     /*! @copydoc rx-take_while.hpp
1414     */
1415     template<class... AN>
take_while(AN &&...an) const1416     auto take_while(AN&&... an) const
1417         /// \cond SHOW_SERVICE_MEMBERS
1418         -> decltype(observable_member(take_while_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1419         /// \endcond
1420     {
1421         return      observable_member(take_while_tag{},                *this, std::forward<AN>(an)...);
1422     }
1423 
1424     /*! @copydoc rx-repeat.hpp
1425      */
1426     template<class... AN>
repeat(AN...an) const1427     auto repeat(AN... an) const
1428         /// \cond SHOW_SERVICE_MEMBERS
1429         -> decltype(observable_member(repeat_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1430         /// \endcond
1431     {
1432         return      observable_member(repeat_tag{},                *this, std::forward<AN>(an)...);
1433     }
1434 
1435     /*! @copydoc rx-retry.hpp
1436      */
1437     template<class... AN>
retry(AN...an) const1438     auto retry(AN... an) const
1439         /// \cond SHOW_SERVICE_MEMBERS
1440         -> decltype(observable_member(retry_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1441         /// \endcond
1442     {
1443         return      observable_member(retry_tag{},                *(this_type*)this, std::forward<AN>(an)...);
1444     }
1445 
1446     /*! @copydoc rx-start_with.hpp
1447      */
1448     template<class... AN>
start_with(AN...an) const1449     auto start_with(AN... an) const
1450         /// \cond SHOW_SERVICE_MEMBERS
1451         -> decltype(observable_member(start_with_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1452         /// \endcond
1453     {
1454         return      observable_member(start_with_tag{},                *this, std::forward<AN>(an)...);
1455     }
1456 
1457     /*! @copydoc rx-pairwise.hpp
1458      */
1459     template<class... AN>
pairwise(AN...an) const1460     auto pairwise(AN... an) const
1461         /// \cond SHOW_SERVICE_MEMBERS
1462         -> decltype(observable_member(pairwise_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1463         /// \endcond
1464     {
1465         return      observable_member(pairwise_tag{},                *this, std::forward<AN>(an)...);
1466     }
1467 };
1468 
1469 template<class T, class SourceOperator>
operator ==(const observable<T,SourceOperator> & lhs,const observable<T,SourceOperator> & rhs)1470 inline bool operator==(const observable<T, SourceOperator>& lhs, const observable<T, SourceOperator>& rhs) {
1471     return lhs.source_operator == rhs.source_operator;
1472 }
1473 template<class T, class SourceOperator>
operator !=(const observable<T,SourceOperator> & lhs,const observable<T,SourceOperator> & rhs)1474 inline bool operator!=(const observable<T, SourceOperator>& lhs, const observable<T, SourceOperator>& rhs) {
1475     return !(lhs == rhs);
1476 }
1477 
1478 /*!
1479     \defgroup group-core Basics
1480 
1481     \brief These are the core classes that combine to represent a set of values emitted over time that can be cancelled.
1482 
1483     \class rxcpp::observable<void, void>
1484 
1485     \brief typed as ```rxcpp::observable<>```, this is a collection of factory methods that return an observable.
1486 
1487     \ingroup group-core
1488 
1489     \par Create a new type of observable
1490 
1491     \sample
1492     \snippet create.cpp Create sample
1493     \snippet output.txt Create sample
1494 
1495     \par Create an observable that emits a range of values
1496 
1497     \sample
1498     \snippet range.cpp range sample
1499     \snippet output.txt range sample
1500 
1501     \par Create an observable that emits nothing / generates an error / immediately completes
1502 
1503     \sample
1504     \snippet never.cpp never sample
1505     \snippet output.txt never sample
1506     \snippet error.cpp error sample
1507     \snippet output.txt error sample
1508     \snippet empty.cpp empty sample
1509     \snippet output.txt empty sample
1510 
1511     \par Create an observable that generates new observable for each subscriber
1512 
1513     \sample
1514     \snippet defer.cpp defer sample
1515     \snippet output.txt defer sample
1516 
1517     \par Create an observable that emits items every specified interval of time
1518 
1519     \sample
1520     \snippet interval.cpp interval sample
1521     \snippet output.txt interval sample
1522 
1523     \par Create an observable that emits items in the specified interval of time
1524 
1525     \sample
1526     \snippet timer.cpp duration timer sample
1527     \snippet output.txt duration timer sample
1528 
1529     \par Create an observable that emits all items from a collection
1530 
1531     \sample
1532     \snippet iterate.cpp iterate sample
1533     \snippet output.txt iterate sample
1534 
1535     \par Create an observable that emits a set of specified items
1536 
1537     \sample
1538     \snippet from.cpp from sample
1539     \snippet output.txt from sample
1540 
1541     \par Create an observable that emits a single item
1542 
1543     \sample
1544     \snippet just.cpp just sample
1545     \snippet output.txt just sample
1546 
1547     \par Create an observable that emits a set of items and then subscribes to another observable
1548 
1549     \sample
1550     \snippet start_with.cpp full start_with sample
1551     \snippet output.txt full start_with sample
1552 
1553     \par Create an observable that generates a new observable based on a generated resource for each subscriber
1554 
1555     \sample
1556     \snippet scope.cpp scope sample
1557     \snippet output.txt scope sample
1558 
1559 */
1560 template<>
1561 class observable<void, void>
1562 {
1563     ~observable();
1564 public:
1565     /*! @copydoc rx-create.hpp
1566      */
1567     template<class T, class OnSubscribe>
create(OnSubscribe os)1568     static auto create(OnSubscribe os)
1569         -> decltype(rxs::create<T>(std::move(os))) {
1570         return      rxs::create<T>(std::move(os));
1571     }
1572 
1573     /*! @copydoc rx-range.hpp
1574      */
1575     template<class T>
range(T first=0,T last=std::numeric_limits<T>::max (),std::ptrdiff_t step=1)1576     static auto range(T first = 0, T last = std::numeric_limits<T>::max(), std::ptrdiff_t step = 1)
1577         -> decltype(rxs::range<T>(first, last, step, identity_current_thread())) {
1578         return      rxs::range<T>(first, last, step, identity_current_thread());
1579     }
1580     /*! @copydoc rx-range.hpp
1581      */
1582     template<class T, class Coordination>
range(T first,T last,std::ptrdiff_t step,Coordination cn)1583     static auto range(T first, T last, std::ptrdiff_t step, Coordination cn)
1584         -> decltype(rxs::range<T>(first, last, step, std::move(cn))) {
1585         return      rxs::range<T>(first, last, step, std::move(cn));
1586     }
1587     /*! @copydoc rx-range.hpp
1588      */
1589     template<class T, class Coordination>
range(T first,T last,Coordination cn)1590     static auto range(T first, T last, Coordination cn)
1591         -> decltype(rxs::range<T>(first, last, std::move(cn))) {
1592         return      rxs::range<T>(first, last, std::move(cn));
1593     }
1594     /*! @copydoc rx-range.hpp
1595      */
1596     template<class T, class Coordination>
range(T first,Coordination cn)1597     static auto range(T first, Coordination cn)
1598         -> decltype(rxs::range<T>(first, std::move(cn))) {
1599         return      rxs::range<T>(first, std::move(cn));
1600     }
1601 
1602     /*! @copydoc rx-never.hpp
1603      */
1604     template<class T>
never()1605     static auto never()
1606         -> decltype(rxs::never<T>()) {
1607         return      rxs::never<T>();
1608     }
1609 
1610     /*! @copydoc rx-defer.hpp
1611      */
1612     template<class ObservableFactory>
defer(ObservableFactory of)1613     static auto defer(ObservableFactory of)
1614         -> decltype(rxs::defer(std::move(of))) {
1615         return      rxs::defer(std::move(of));
1616     }
1617 
1618     /*! @copydoc rx-interval.hpp
1619      */
1620     template<class... AN>
interval(rxsc::scheduler::clock_type::duration period,AN ** ...)1621     static auto interval(rxsc::scheduler::clock_type::duration period, AN**...)
1622         -> decltype(rxs::interval(period)) {
1623         return      rxs::interval(period);
1624         static_assert(sizeof...(AN) == 0, "interval(period) was passed too many arguments.");
1625     }
1626     /*! @copydoc rx-interval.hpp
1627      */
1628     template<class Coordination>
interval(rxsc::scheduler::clock_type::duration period,Coordination cn)1629     static auto interval(rxsc::scheduler::clock_type::duration period, Coordination cn)
1630         -> decltype(rxs::interval(period, std::move(cn))) {
1631         return      rxs::interval(period, std::move(cn));
1632     }
1633     /*! @copydoc rx-interval.hpp
1634      */
1635     template<class... AN>
interval(rxsc::scheduler::clock_type::time_point initial,rxsc::scheduler::clock_type::duration period,AN ** ...)1636     static auto interval(rxsc::scheduler::clock_type::time_point initial, rxsc::scheduler::clock_type::duration period, AN**...)
1637         -> decltype(rxs::interval(initial, period)) {
1638         return      rxs::interval(initial, period);
1639         static_assert(sizeof...(AN) == 0, "interval(initial, period) was passed too many arguments.");
1640     }
1641     /*! @copydoc rx-interval.hpp
1642      */
1643     template<class Coordination>
interval(rxsc::scheduler::clock_type::time_point initial,rxsc::scheduler::clock_type::duration period,Coordination cn)1644     static auto interval(rxsc::scheduler::clock_type::time_point initial, rxsc::scheduler::clock_type::duration period, Coordination cn)
1645         -> decltype(rxs::interval(initial, period, std::move(cn))) {
1646         return      rxs::interval(initial, period, std::move(cn));
1647     }
1648 
1649     /*! @copydoc rx-timer.hpp
1650      */
1651     template<class... AN>
timer(rxsc::scheduler::clock_type::time_point at,AN ** ...)1652     static auto timer(rxsc::scheduler::clock_type::time_point at, AN**...)
1653         -> decltype(rxs::timer(at)) {
1654         return      rxs::timer(at);
1655         static_assert(sizeof...(AN) == 0, "timer(at) was passed too many arguments.");
1656     }
1657     /*! @copydoc rx-timer.hpp
1658      */
1659     template<class... AN>
timer(rxsc::scheduler::clock_type::duration after,AN ** ...)1660     static auto timer(rxsc::scheduler::clock_type::duration after, AN**...)
1661         -> decltype(rxs::timer(after)) {
1662         return      rxs::timer(after);
1663         static_assert(sizeof...(AN) == 0, "timer(after) was passed too many arguments.");
1664     }
1665     /*! @copydoc rx-timer.hpp
1666      */
1667     template<class Coordination>
timer(rxsc::scheduler::clock_type::time_point when,Coordination cn)1668     static auto timer(rxsc::scheduler::clock_type::time_point when, Coordination cn)
1669         -> decltype(rxs::timer(when, std::move(cn))) {
1670         return      rxs::timer(when, std::move(cn));
1671     }
1672     /*! @copydoc rx-timer.hpp
1673      */
1674     template<class Coordination>
timer(rxsc::scheduler::clock_type::duration when,Coordination cn)1675     static auto timer(rxsc::scheduler::clock_type::duration when, Coordination cn)
1676         -> decltype(rxs::timer(when, std::move(cn))) {
1677         return      rxs::timer(when, std::move(cn));
1678     }
1679 
1680     /*! @copydoc rx-iterate.hpp
1681      */
1682     template<class Collection>
iterate(Collection c)1683     static auto iterate(Collection c)
1684         -> decltype(rxs::iterate(std::move(c), identity_current_thread())) {
1685         return      rxs::iterate(std::move(c), identity_current_thread());
1686     }
1687     /*! @copydoc rx-iterate.hpp
1688      */
1689     template<class Collection, class Coordination>
iterate(Collection c,Coordination cn)1690     static auto iterate(Collection c, Coordination cn)
1691         -> decltype(rxs::iterate(std::move(c), std::move(cn))) {
1692         return      rxs::iterate(std::move(c), std::move(cn));
1693     }
1694 
1695     /*! @copydoc rxcpp::sources::from()
1696      */
1697     template<class T>
from()1698     static auto from()
1699         -> decltype(    rxs::from<T>()) {
1700         return          rxs::from<T>();
1701     }
1702     /*! @copydoc rxcpp::sources::from(Coordination cn)
1703      */
1704     template<class T, class Coordination>
from(Coordination cn)1705     static auto from(Coordination cn)
1706         -> typename std::enable_if<is_coordination<Coordination>::value,
1707             decltype(   rxs::from<T>(std::move(cn)))>::type {
1708         return          rxs::from<T>(std::move(cn));
1709     }
1710     /*! @copydoc rxcpp::sources::from(Value0 v0, ValueN... vn)
1711      */
1712     template<class Value0, class... ValueN>
from(Value0 v0,ValueN...vn)1713     static auto from(Value0 v0, ValueN... vn)
1714         -> typename std::enable_if<!is_coordination<Value0>::value,
1715             decltype(   rxs::from(v0, vn...))>::type {
1716         return          rxs::from(v0, vn...);
1717     }
1718     /*! @copydoc rxcpp::sources::from(Coordination cn, Value0 v0, ValueN... vn)
1719      */
1720     template<class Coordination, class Value0, class... ValueN>
from(Coordination cn,Value0 v0,ValueN...vn)1721     static auto from(Coordination cn, Value0 v0, ValueN... vn)
1722         -> typename std::enable_if<is_coordination<Coordination>::value,
1723             decltype(   rxs::from(std::move(cn), v0, vn...))>::type {
1724         return          rxs::from(std::move(cn), v0, vn...);
1725     }
1726 
1727     /*! @copydoc rxcpp::sources::just(Value0 v0)
1728      */
1729     template<class T>
just(T v)1730     static auto just(T v)
1731         -> decltype(rxs::just(std::move(v))) {
1732         return      rxs::just(std::move(v));
1733     }
1734     /*! @copydoc rxcpp::sources::just(Value0 v0, Coordination cn)
1735      */
1736     template<class T, class Coordination>
just(T v,Coordination cn)1737     static auto just(T v, Coordination cn)
1738         -> decltype(rxs::just(std::move(v), std::move(cn))) {
1739         return      rxs::just(std::move(v), std::move(cn));
1740     }
1741 
1742     /*! @copydoc rxcpp::sources::start_with(Observable o, Value0 v0, ValueN... vn)
1743      */
1744     template<class Observable, class Value0, class... ValueN>
start_with(Observable o,Value0 v0,ValueN...vn)1745     static auto start_with(Observable o, Value0 v0, ValueN... vn)
1746         -> decltype(rxs::start_with(std::move(o), std::move(v0), std::move(vn)...)) {
1747         return      rxs::start_with(std::move(o), std::move(v0), std::move(vn)...);
1748     }
1749 
1750     /*! @copydoc rx-empty.hpp
1751      */
1752     template<class T>
empty()1753     static auto empty()
1754         -> decltype(from<T>()) {
1755         return      from<T>();
1756     }
1757     /*! @copydoc rx-empty.hpp
1758      */
1759     template<class T, class Coordination>
empty(Coordination cn)1760     static auto empty(Coordination cn)
1761         -> decltype(from<T>(std::move(cn))) {
1762         return      from<T>(std::move(cn));
1763     }
1764 
1765     /*! @copydoc rx-error.hpp
1766      */
1767     template<class T, class Exception>
error(Exception && e)1768     static auto error(Exception&& e)
1769         -> decltype(rxs::error<T>(std::forward<Exception>(e))) {
1770         return      rxs::error<T>(std::forward<Exception>(e));
1771     }
1772     /*! @copydoc rx-error.hpp
1773      */
1774     template<class T, class Exception, class Coordination>
error(Exception && e,Coordination cn)1775     static auto error(Exception&& e, Coordination cn)
1776         -> decltype(rxs::error<T>(std::forward<Exception>(e), std::move(cn))) {
1777         return      rxs::error<T>(std::forward<Exception>(e), std::move(cn));
1778     }
1779 
1780     /*! @copydoc rx-scope.hpp
1781      */
1782     template<class ResourceFactory, class ObservableFactory>
scope(ResourceFactory rf,ObservableFactory of)1783     static auto scope(ResourceFactory rf, ObservableFactory of)
1784         -> decltype(rxs::scope(std::move(rf), std::move(of))) {
1785         return      rxs::scope(std::move(rf), std::move(of));
1786     }
1787 };
1788 
1789 }
1790 
1791 //
1792 // support range() >> filter() >> subscribe() syntax
1793 // '>>' is spelled 'stream'
1794 //
1795 template<class T, class SourceOperator, class OperatorFactory>
operator >>(const rxcpp::observable<T,SourceOperator> & source,OperatorFactory && of)1796 auto operator >> (const rxcpp::observable<T, SourceOperator>& source, OperatorFactory&& of)
1797     -> decltype(source.op(std::forward<OperatorFactory>(of))) {
1798     return      source.op(std::forward<OperatorFactory>(of));
1799 }
1800 
1801 //
1802 // support range() | filter() | subscribe() syntax
1803 // '|' is spelled 'pipe'
1804 //
1805 template<class T, class SourceOperator, class OperatorFactory>
operator |(const rxcpp::observable<T,SourceOperator> & source,OperatorFactory && of)1806 auto operator | (const rxcpp::observable<T, SourceOperator>& source, OperatorFactory&& of)
1807     -> decltype(source.op(std::forward<OperatorFactory>(of))) {
1808     return      source.op(std::forward<OperatorFactory>(of));
1809 }
1810 
1811 #endif
1812