1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 #if !defined(RXCPP_RX_OBSERVABLE_HPP)
6 #define RXCPP_RX_OBSERVABLE_HPP
7
8 #include "rx-includes.hpp"
9
10 #ifdef __GNUG__
11 #define EXPLICIT_THIS this->
12 #else
13 #define EXPLICIT_THIS
14 #endif
15
16 namespace rxcpp {
17
18 namespace detail {
19
20 template<class Subscriber, class T>
21 struct has_on_subscribe_for
22 {
23 struct not_void {};
24 template<class CS, class CT>
25 static auto check(int) -> decltype((*(CT*)nullptr).on_subscribe(*(CS*)nullptr));
26 template<class CS, class CT>
27 static not_void check(...);
28
29 typedef decltype(check<rxu::decay_t<Subscriber>, T>(0)) detail_result;
30 static const bool value = std::is_same<detail_result, void>::value;
31 };
32
33 }
34
35 template<class T>
36 class dynamic_observable
37 : public rxs::source_base<T>
38 {
39 struct state_type
40 : public std::enable_shared_from_this<state_type>
41 {
42 typedef std::function<void(subscriber<T>)> onsubscribe_type;
43
44 onsubscribe_type on_subscribe;
45 };
46 std::shared_ptr<state_type> state;
47
48 template<class U>
49 friend bool operator==(const dynamic_observable<U>&, const dynamic_observable<U>&);
50
51 template<class SO>
construct(SO && source,rxs::tag_source &&)52 void construct(SO&& source, rxs::tag_source&&) {
53 rxu::decay_t<SO> so = std::forward<SO>(source);
54 state->on_subscribe = [so](subscriber<T> o) mutable {
55 so.on_subscribe(std::move(o));
56 };
57 }
58
59 struct tag_function {};
60 template<class F>
construct(F && f,tag_function &&)61 void construct(F&& f, tag_function&&) {
62 state->on_subscribe = std::forward<F>(f);
63 }
64
65 public:
66
67 typedef tag_dynamic_observable dynamic_observable_tag;
68
dynamic_observable()69 dynamic_observable()
70 {
71 }
72
73 template<class SOF>
dynamic_observable(SOF && sof,typename std::enable_if<!is_dynamic_observable<SOF>::value,void ** >::type=0)74 explicit dynamic_observable(SOF&& sof, typename std::enable_if<!is_dynamic_observable<SOF>::value, void**>::type = 0)
75 : state(std::make_shared<state_type>())
76 {
77 construct(std::forward<SOF>(sof),
78 typename std::conditional<rxs::is_source<SOF>::value || rxo::is_operator<SOF>::value, rxs::tag_source, tag_function>::type());
79 }
80
on_subscribe(subscriber<T> o) const81 void on_subscribe(subscriber<T> o) const {
82 state->on_subscribe(std::move(o));
83 }
84
85 template<class Subscriber>
86 typename std::enable_if<is_subscriber<Subscriber>::value, void>::type
on_subscribe(Subscriber o) const87 on_subscribe(Subscriber o) const {
88 state->on_subscribe(o.as_dynamic());
89 }
90 };
91
92 template<class T>
operator ==(const dynamic_observable<T> & lhs,const dynamic_observable<T> & rhs)93 inline bool operator==(const dynamic_observable<T>& lhs, const dynamic_observable<T>& rhs) {
94 return lhs.state == rhs.state;
95 }
96 template<class T>
operator !=(const dynamic_observable<T> & lhs,const dynamic_observable<T> & rhs)97 inline bool operator!=(const dynamic_observable<T>& lhs, const dynamic_observable<T>& rhs) {
98 return !(lhs == rhs);
99 }
100
101 template<class T, class Source>
make_observable_dynamic(Source && s)102 observable<T> make_observable_dynamic(Source&& s) {
103 return observable<T>(dynamic_observable<T>(std::forward<Source>(s)));
104 }
105
106 namespace detail {
107 template<bool Selector, class Default, class SO>
108 struct resolve_observable;
109
110 template<class Default, class SO>
111 struct resolve_observable<true, Default, SO>
112 {
113 typedef typename SO::type type;
114 typedef typename type::value_type value_type;
115 static const bool value = true;
116 typedef observable<value_type, type> observable_type;
117 template<class... AN>
makerxcpp::detail::resolve_observable118 static observable_type make(const Default&, AN&&... an) {
119 return observable_type(type(std::forward<AN>(an)...));
120 }
121 };
122 template<class Default, class SO>
123 struct resolve_observable<false, Default, SO>
124 {
125 static const bool value = false;
126 typedef Default observable_type;
127 template<class... AN>
makerxcpp::detail::resolve_observable128 static observable_type make(const observable_type& that, const AN&...) {
129 return that;
130 }
131 };
132 template<class SO>
133 struct resolve_observable<true, void, SO>
134 {
135 typedef typename SO::type type;
136 typedef typename type::value_type value_type;
137 static const bool value = true;
138 typedef observable<value_type, type> observable_type;
139 template<class... AN>
makerxcpp::detail::resolve_observable140 static observable_type make(AN&&... an) {
141 return observable_type(type(std::forward<AN>(an)...));
142 }
143 };
144 template<class SO>
145 struct resolve_observable<false, void, SO>
146 {
147 static const bool value = false;
148 typedef void observable_type;
149 template<class... AN>
makerxcpp::detail::resolve_observable150 static observable_type make(const AN&...) {
151 }
152 };
153
154 }
155
156 template<class Selector, class Default, template<class... TN> class SO, class... AN>
157 struct defer_observable
158 : public detail::resolve_observable<Selector::value, Default, rxu::defer_type<SO, AN...>>
159 {
160 };
161
162 /*!
163 \brief a source of values whose methods block until all values have been emitted. subscribe or use one of the operator methods that reduce the values emitted to a single value.
164
165 \ingroup group-observable
166
167 */
168 template<class T, class Observable>
169 class blocking_observable
170 {
171 template<class Obsvbl, class... ArgN>
blocking_subscribe(const Obsvbl & source,bool do_rethrow,ArgN &&...an)172 static auto blocking_subscribe(const Obsvbl& source, bool do_rethrow, ArgN&&... an)
173 -> void {
174 std::mutex lock;
175 std::condition_variable wake;
176 bool disposed = false;
177 rxu::error_ptr error;
178
179 auto dest = make_subscriber<T>(std::forward<ArgN>(an)...);
180
181 // keep any error to rethrow at the end.
182 auto scbr = make_subscriber<T>(
183 dest,
184 [&](T t){dest.on_next(t);},
185 [&](rxu::error_ptr e){
186 if (do_rethrow) {
187 error = e;
188 } else {
189 dest.on_error(e);
190 }
191 },
192 [&](){dest.on_completed();}
193 );
194
195 auto cs = scbr.get_subscription();
196 cs.add(
197 [&](){
198 std::unique_lock<std::mutex> guard(lock);
199 wake.notify_one();
200 disposed = true;
201 });
202
203 source.subscribe(std::move(scbr));
204
205 std::unique_lock<std::mutex> guard(lock);
206 wake.wait(guard,
207 [&](){
208 return disposed;
209 });
210
211 if (error) {rxu::rethrow_exception(error);}
212 }
213
214 public:
215 typedef rxu::decay_t<Observable> observable_type;
216 observable_type source;
~blocking_observable()217 ~blocking_observable()
218 {
219 }
blocking_observable(observable_type s)220 blocking_observable(observable_type s) : source(std::move(s)) {}
221
222 ///
223 /// `subscribe` will cause this observable to emit values to the provided subscriber.
224 ///
225 /// \return void
226 ///
227 /// \param an... - the arguments are passed to make_subscriber().
228 ///
229 /// callers must provide enough arguments to make a subscriber.
230 /// overrides are supported. thus
231 /// `subscribe(thesubscriber, composite_subscription())`
232 /// will take `thesubscriber.get_observer()` and the provided
233 /// subscription and subscribe to the new subscriber.
234 /// the `on_next`, `on_error`, `on_completed` methods can be supplied instead of an observer
235 /// if a subscription or subscriber is not provided then a new subscription will be created.
236 ///
237 template<class... ArgN>
subscribe(ArgN &&...an) const238 auto subscribe(ArgN&&... an) const
239 -> void {
240 return blocking_subscribe(source, false, std::forward<ArgN>(an)...);
241 }
242
243 ///
244 /// `subscribe_with_rethrow` will cause this observable to emit values to the provided subscriber.
245 ///
246 /// \note If the source observable calls on_error, the raised exception is rethrown by this method.
247 ///
248 /// \note If the source observable calls on_error, the `on_error` method on the subscriber will not be called.
249 ///
250 /// \return void
251 ///
252 /// \param an... - the arguments are passed to make_subscriber().
253 ///
254 /// callers must provide enough arguments to make a subscriber.
255 /// overrides are supported. thus
256 /// `subscribe(thesubscriber, composite_subscription())`
257 /// will take `thesubscriber.get_observer()` and the provided
258 /// subscription and subscribe to the new subscriber.
259 /// the `on_next`, `on_error`, `on_completed` methods can be supplied instead of an observer
260 /// if a subscription or subscriber is not provided then a new subscription will be created.
261 ///
262 template<class... ArgN>
subscribe_with_rethrow(ArgN &&...an) const263 auto subscribe_with_rethrow(ArgN&&... an) const
264 -> void {
265 return blocking_subscribe(source, true, std::forward<ArgN>(an)...);
266 }
267
268 /*! Return the first item emitted by this blocking_observable, or throw an std::runtime_error exception if it emits no items.
269
270 \return The first item emitted by this blocking_observable.
271
272 \note If the source observable calls on_error, the raised exception is rethrown by this method.
273
274 \sample
275 When the source observable emits at least one item:
276 \snippet blocking_observable.cpp blocking first sample
277 \snippet output.txt blocking first sample
278
279 When the source observable is empty:
280 \snippet blocking_observable.cpp blocking first empty sample
281 \snippet output.txt blocking first empty sample
282 */
283 template<class... AN>
first(AN ** ...)284 auto first(AN**...) -> delayed_type_t<T, AN...> const {
285 rxu::maybe<T> result;
286 composite_subscription cs;
287 subscribe_with_rethrow(
288 cs,
289 [&](T v){result.reset(v); cs.unsubscribe();});
290 if (result.empty())
291 rxu::throw_exception(rxcpp::empty_error("first() requires a stream with at least one value"));
292 return result.get();
293 static_assert(sizeof...(AN) == 0, "first() was passed too many arguments.");
294 }
295
296 /*! Return the last item emitted by this blocking_observable, or throw an std::runtime_error exception if it emits no items.
297
298 \return The last item emitted by this blocking_observable.
299
300 \note If the source observable calls on_error, the raised exception is rethrown by this method.
301
302 \sample
303 When the source observable emits at least one item:
304 \snippet blocking_observable.cpp blocking last sample
305 \snippet output.txt blocking last sample
306
307 When the source observable is empty:
308 \snippet blocking_observable.cpp blocking last empty sample
309 \snippet output.txt blocking last empty sample
310 */
311 template<class... AN>
last(AN ** ...)312 auto last(AN**...) -> delayed_type_t<T, AN...> const {
313 rxu::maybe<T> result;
314 subscribe_with_rethrow(
315 [&](T v){result.reset(v);});
316 if (result.empty())
317 rxu::throw_exception(rxcpp::empty_error("last() requires a stream with at least one value"));
318 return result.get();
319 static_assert(sizeof...(AN) == 0, "last() was passed too many arguments.");
320 }
321
322 /*! Return the total number of items emitted by this blocking_observable.
323
324 \return The total number of items emitted by this blocking_observable.
325
326 \sample
327 \snippet blocking_observable.cpp blocking count sample
328 \snippet output.txt blocking count sample
329
330 When the source observable calls on_error:
331 \snippet blocking_observable.cpp blocking count error sample
332 \snippet output.txt blocking count error sample
333 */
count() const334 int count() const {
335 int result = 0;
336 source.count().as_blocking().subscribe_with_rethrow(
337 [&](int v){result = v;});
338 return result;
339 }
340
341 /*! Return the sum of all items emitted by this blocking_observable, or throw an std::runtime_error exception if it emits no items.
342
343 \return The sum of all items emitted by this blocking_observable.
344
345 \sample
346 When the source observable emits at least one item:
347 \snippet blocking_observable.cpp blocking sum sample
348 \snippet output.txt blocking sum sample
349
350 When the source observable is empty:
351 \snippet blocking_observable.cpp blocking sum empty sample
352 \snippet output.txt blocking sum empty sample
353
354 When the source observable calls on_error:
355 \snippet blocking_observable.cpp blocking sum error sample
356 \snippet output.txt blocking sum error sample
357 */
sum() const358 T sum() const {
359 return source.sum().as_blocking().last();
360 }
361
362 /*! Return the average value of all items emitted by this blocking_observable, or throw an std::runtime_error exception if it emits no items.
363
364 \return The average value of all items emitted by this blocking_observable.
365
366 \sample
367 When the source observable emits at least one item:
368 \snippet blocking_observable.cpp blocking average sample
369 \snippet output.txt blocking average sample
370
371 When the source observable is empty:
372 \snippet blocking_observable.cpp blocking average empty sample
373 \snippet output.txt blocking average empty sample
374
375 When the source observable calls on_error:
376 \snippet blocking_observable.cpp blocking average error sample
377 \snippet output.txt blocking average error sample
378 */
average() const379 double average() const {
380 return source.average().as_blocking().last();
381 }
382
383 /*! Return the max of all items emitted by this blocking_observable, or throw an std::runtime_error exception if it emits no items.
384
385 \return The max of all items emitted by this blocking_observable.
386
387 \sample
388 When the source observable emits at least one item:
389 \snippet blocking_observable.cpp blocking max sample
390 \snippet output.txt blocking max sample
391
392 When the source observable is empty:
393 \snippet blocking_observable.cpp blocking max empty sample
394 \snippet output.txt blocking max empty sample
395
396 When the source observable calls on_error:
397 \snippet blocking_observable.cpp blocking max error sample
398 \snippet output.txt blocking max error sample
399 */
max() const400 T max() const {
401 return source.max().as_blocking().last();
402 }
403
404 /*! Return the min of all items emitted by this blocking_observable, or throw an std::runtime_error exception if it emits no items.
405
406 \return The min of all items emitted by this blocking_observable.
407
408 \sample
409 When the source observable emits at least one item:
410 \snippet blocking_observable.cpp blocking min sample
411 \snippet output.txt blocking min sample
412
413 When the source observable is empty:
414 \snippet blocking_observable.cpp blocking min empty sample
415 \snippet output.txt blocking min empty sample
416
417 When the source observable calls on_error:
418 \snippet blocking_observable.cpp blocking min error sample
419 \snippet output.txt blocking min error sample
420 */
min() const421 T min() const {
422 return source.min().as_blocking().last();
423 }
424 };
425
426 namespace detail {
427
428 template<class SourceOperator, class Subscriber>
429 struct safe_subscriber
430 {
safe_subscriberrxcpp::detail::safe_subscriber431 safe_subscriber(SourceOperator& so, Subscriber& o) : so(std::addressof(so)), o(std::addressof(o)) {}
432
subscriberxcpp::detail::safe_subscriber433 void subscribe() {
434 RXCPP_TRY {
435 so->on_subscribe(*o);
436 } RXCPP_CATCH(...) {
437 if (!o->is_subscribed()) {
438 rxu::rethrow_current_exception();
439 }
440 o->on_error(rxu::make_error_ptr(rxu::current_exception()));
441 o->unsubscribe();
442 }
443 }
444
operator ()rxcpp::detail::safe_subscriber445 void operator()(const rxsc::schedulable&) {
446 subscribe();
447 }
448
449 SourceOperator* so;
450 Subscriber* o;
451 };
452
453 }
454
455 template<>
456 class observable<void, void>;
457
458 /*!
459 \defgroup group-observable Observables
460
461 \brief These are the set of observable classes in rxcpp.
462
463 \class rxcpp::observable
464
465 \ingroup group-observable group-core
466
467 \brief a source of values. subscribe or use one of the operator methods that return a new observable, which uses this observable as a source.
468
469 \par Some code
470 This sample will observable::subscribe() to values from a observable<void, void>::range().
471
472 \sample
473 \snippet range.cpp range sample
474 \snippet output.txt range sample
475
476 */
477 template<class T, class SourceOperator>
478 class observable
479 : public observable_base<T>
480 {
481 static_assert(std::is_same<T, typename SourceOperator::value_type>::value, "SourceOperator::value_type must be the same as T in observable<T, SourceOperator>");
482
483 typedef observable<T, SourceOperator> this_type;
484
485 public:
486 typedef rxu::decay_t<SourceOperator> source_operator_type;
487 mutable source_operator_type source_operator;
488
489 private:
490
491 template<class U, class SO>
492 friend class observable;
493
494 template<class U, class SO>
495 friend bool operator==(const observable<U, SO>&, const observable<U, SO>&);
496
497 template<class Subscriber>
detail_subscribe(Subscriber o) const498 auto detail_subscribe(Subscriber o) const
499 -> composite_subscription {
500
501 typedef rxu::decay_t<Subscriber> subscriber_type;
502
503 static_assert(is_subscriber<subscriber_type>::value, "subscribe must be passed a subscriber");
504 static_assert(std::is_same<typename source_operator_type::value_type, T>::value && std::is_convertible<T*, typename subscriber_type::value_type*>::value, "the value types in the sequence must match or be convertible");
505 static_assert(detail::has_on_subscribe_for<subscriber_type, source_operator_type>::value, "inner must have on_subscribe method that accepts this subscriber ");
506
507 trace_activity().subscribe_enter(*this, o);
508
509 if (!o.is_subscribed()) {
510 trace_activity().subscribe_return(*this);
511 return o.get_subscription();
512 }
513
514 detail::safe_subscriber<source_operator_type, subscriber_type> subscriber(source_operator, o);
515
516 // make sure to let current_thread take ownership of the thread as early as possible.
517 if (rxsc::current_thread::is_schedule_required()) {
518 const auto& sc = rxsc::make_current_thread();
519 sc.create_worker(o.get_subscription()).schedule(subscriber);
520 } else {
521 // current_thread already owns this thread.
522 subscriber.subscribe();
523 }
524
525 trace_activity().subscribe_return(*this);
526 return o.get_subscription();
527 }
528
529 public:
530 typedef T value_type;
531
532 static_assert(rxo::is_operator<source_operator_type>::value || rxs::is_source<source_operator_type>::value, "observable must wrap an operator or source");
533
~observable()534 ~observable()
535 {
536 }
537
observable()538 observable()
539 {
540 }
541
observable(const source_operator_type & o)542 explicit observable(const source_operator_type& o)
543 : source_operator(o)
544 {
545 }
observable(source_operator_type && o)546 explicit observable(source_operator_type&& o)
547 : source_operator(std::move(o))
548 {
549 }
550
551 /// implicit conversion between observables of the same value_type
552 template<class SO>
observable(const observable<T,SO> & o)553 observable(const observable<T, SO>& o)
554 : source_operator(o.source_operator)
555 {}
556 /// implicit conversion between observables of the same value_type
557 template<class SO>
observable(observable<T,SO> && o)558 observable(observable<T, SO>&& o)
559 : source_operator(std::move(o.source_operator))
560 {}
561
562 #if 0
563 template<class I>
564 void on_subscribe(observer<T, I> o) const {
565 source_operator.on_subscribe(o);
566 }
567 #endif
568
569 /*! @copydoc rxcpp::operators::as_dynamic
570 */
571 template<class... AN>
as_dynamic(AN ** ...) const572 observable<T> as_dynamic(AN**...) const {
573 return *this;
574 static_assert(sizeof...(AN) == 0, "as_dynamic() was passed too many arguments.");
575 }
576
577 /*! @copydoc rx-ref_count.hpp
578 */
579 template<class... AN>
ref_count(AN...an) const580 auto ref_count(AN... an) const // ref_count(ConnectableObservable&&)
581 /// \cond SHOW_SERVICE_MEMBERS
582 -> decltype(observable_member(ref_count_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
583 /// \endcond
584 {
585 return observable_member(ref_count_tag{}, *this, std::forward<AN>(an)...);
586 }
587
588 /*! @copydoc rxcpp::operators::as_blocking
589 */
590 template<class... AN>
as_blocking(AN ** ...) const591 blocking_observable<T, this_type> as_blocking(AN**...) const {
592 return blocking_observable<T, this_type>(*this);
593 static_assert(sizeof...(AN) == 0, "as_blocking() was passed too many arguments.");
594 }
595
596 /// \cond SHOW_SERVICE_MEMBERS
597
598 ///
599 /// takes any function that will take this observable and produce a result value.
600 /// this is intended to allow externally defined operators, that use subscribe,
601 /// to be connected into the expression.
602 ///
603 template<class OperatorFactory>
op(OperatorFactory && of) const604 auto op(OperatorFactory&& of) const
605 -> decltype(of(*(const this_type*)nullptr)) {
606 return of(*this);
607 static_assert(is_operator_factory_for<this_type, OperatorFactory>::value, "Function passed for op() must have the signature Result(SourceObservable)");
608 }
609
610 /*! @copydoc rx-lift.hpp
611 */
612 template<class ResultType, class Operator>
lift(Operator && op) const613 auto lift(Operator&& op) const
614 -> observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>> {
615 return observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>(
616 rxo::detail::lift_operator<ResultType, source_operator_type, Operator>(source_operator, std::forward<Operator>(op)));
617 static_assert(detail::is_lift_function_for<T, subscriber<ResultType>, Operator>::value, "Function passed for lift() must have the signature subscriber<...>(subscriber<T, ...>)");
618 }
619
620 ///
621 /// takes any function that will take a subscriber for this observable and produce a subscriber.
622 /// this is intended to allow externally defined operators, that use make_subscriber, to be connected
623 /// into the expression.
624 ///
625 template<class ResultType, class Operator>
lift_if(Operator && op) const626 auto lift_if(Operator&& op) const
627 -> typename std::enable_if<detail::is_lift_function_for<T, subscriber<ResultType>, Operator>::value,
628 observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>>::type {
629 return observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>(
630 rxo::detail::lift_operator<ResultType, source_operator_type, Operator>(source_operator, std::forward<Operator>(op)));
631 }
632 ///
633 /// takes any function that will take a subscriber for this observable and produce a subscriber.
634 /// this is intended to allow externally defined operators, that use make_subscriber, to be connected
635 /// into the expression.
636 ///
637 template<class ResultType, class Operator>
lift_if(Operator &&) const638 auto lift_if(Operator&&) const
639 -> typename std::enable_if<!detail::is_lift_function_for<T, subscriber<ResultType>, Operator>::value,
640 decltype(rxs::from<ResultType>())>::type {
641 return rxs::from<ResultType>();
642 }
643 /// \endcond
644
645 /*! @copydoc rx-subscribe.hpp
646 */
647 template<class... ArgN>
subscribe(ArgN &&...an) const648 auto subscribe(ArgN&&... an) const
649 -> composite_subscription {
650 return detail_subscribe(make_subscriber<T>(std::forward<ArgN>(an)...));
651 }
652
653 /*! @copydoc rx-all.hpp
654 */
655 template<class... AN>
all(AN &&...an) const656 auto all(AN&&... an) const
657 /// \cond SHOW_SERVICE_MEMBERS
658 -> decltype(observable_member(all_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
659 /// \endcond
660 {
661 return observable_member(all_tag{}, *this, std::forward<AN>(an)...);
662 }
663
664 /*! @copydoc rxcpp::operators::is_empty
665 */
666 template<class... AN>
is_empty(AN &&...an) const667 auto is_empty(AN&&... an) const
668 /// \cond SHOW_SERVICE_MEMBERS
669 -> decltype(observable_member(is_empty_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
670 /// \endcond
671 {
672 return observable_member(is_empty_tag{}, *this, std::forward<AN>(an)...);
673 }
674
675 /*! @copydoc rx-any.hpp
676 */
677 template<class... AN>
any(AN &&...an) const678 auto any(AN&&... an) const
679 /// \cond SHOW_SERVICE_MEMBERS
680 -> decltype(observable_member(any_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
681 /// \endcond
682 {
683 return observable_member(any_tag{}, *this, std::forward<AN>(an)...);
684 }
685
686 /*! @copydoc rxcpp::operators::exists
687 */
688 template<class... AN>
exists(AN &&...an) const689 auto exists(AN&&... an) const
690 /// \cond SHOW_SERVICE_MEMBERS
691 -> decltype(observable_member(exists_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
692 /// \endcond
693 {
694 return observable_member(exists_tag{}, *this, std::forward<AN>(an)...);
695 }
696
697 /*! @copydoc rxcpp::operators::contains
698 */
699 template<class... AN>
contains(AN &&...an) const700 auto contains(AN&&... an) const
701 /// \cond SHOW_SERVICE_MEMBERS
702 -> decltype(observable_member(contains_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
703 /// \endcond
704 {
705 return observable_member(contains_tag{}, *this, std::forward<AN>(an)...);
706 }
707
708 /*! @copydoc rx-filter.hpp
709 */
710 template<class... AN>
filter(AN &&...an) const711 auto filter(AN&&... an) const
712 /// \cond SHOW_SERVICE_MEMBERS
713 -> decltype(observable_member(filter_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
714 /// \endcond
715 {
716 return observable_member(filter_tag{}, *this, std::forward<AN>(an)...);
717 }
718
719 /*! @copydoc rx-switch_if_empty.hpp
720 */
721 template<class... AN>
switch_if_empty(AN &&...an) const722 auto switch_if_empty(AN&&... an) const
723 /// \cond SHOW_SERVICE_MEMBERS
724 -> decltype(observable_member(switch_if_empty_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
725 /// \endcond
726 {
727 return observable_member(switch_if_empty_tag{}, *this, std::forward<AN>(an)...);
728 }
729
730 /*! @copydoc rxcpp::operators::default_if_empty
731 */
732 template<class... AN>
default_if_empty(AN &&...an) const733 auto default_if_empty(AN&&... an) const
734 /// \cond SHOW_SERVICE_MEMBERS
735 -> decltype(observable_member(default_if_empty_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
736 /// \endcond
737 {
738 return observable_member(default_if_empty_tag{}, *this, std::forward<AN>(an)...);
739 }
740
741 /*! @copydoc rx-sequence_equal.hpp
742 */
743 template<class... AN>
sequence_equal(AN...an) const744 auto sequence_equal(AN... an) const
745 /// \cond SHOW_SERVICE_MEMBERS
746 -> decltype(observable_member(sequence_equal_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
747 /// \endcond
748 {
749 return observable_member(sequence_equal_tag{}, *this, std::forward<AN>(an)...);
750 }
751
752 /*! @copydoc rx-tap.hpp
753 */
754 template<class... AN>
tap(AN &&...an) const755 auto tap(AN&&... an) const
756 /// \cond SHOW_SERVICE_MEMBERS
757 -> decltype(observable_member(tap_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
758 /// \endcond
759 {
760 return observable_member(tap_tag{}, *this, std::forward<AN>(an)...);
761 }
762
763 /*! @copydoc rx-time_interval.hpp
764 */
765 template<class... AN>
time_interval(AN &&...an) const766 auto time_interval(AN&&... an) const
767 /// \cond SHOW_SERVICE_MEMBERS
768 -> decltype(observable_member(time_interval_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
769 /// \endcond
770 {
771 return observable_member(time_interval_tag{}, *this, std::forward<AN>(an)...);
772 }
773
774 /*! @copydoc rx-timeout.hpp
775 */
776 template<class... AN>
timeout(AN &&...an) const777 auto timeout(AN&&... an) const
778 /// \cond SHOW_SERVICE_MEMBERS
779 -> decltype(observable_member(timeout_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
780 /// \endcond
781 {
782 return observable_member(timeout_tag{}, *this, std::forward<AN>(an)...);
783 }
784
785 /*! @copydoc rx-timestamp.hpp
786 */
787 template<class... AN>
timestamp(AN &&...an) const788 auto timestamp(AN&&... an) const
789 /// \cond SHOW_SERVICE_MEMBERS
790 -> decltype(observable_member(timestamp_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
791 /// \endcond
792 {
793 return observable_member(timestamp_tag{}, *this, std::forward<AN>(an)...);
794 }
795
796 /*! @copydoc rx-finally.hpp
797 */
798 template<class... AN>
finally(AN &&...an) const799 auto finally(AN&&... an) const
800 /// \cond SHOW_SERVICE_MEMBERS
801 -> decltype(observable_member(finally_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
802 /// \endcond
803 {
804 return observable_member(finally_tag{}, *this, std::forward<AN>(an)...);
805 }
806
807 /*! @copydoc rx-on_error_resume_next.hpp
808 */
809 template<class... AN>
on_error_resume_next(AN &&...an) const810 auto on_error_resume_next(AN&&... an) const
811 /// \cond SHOW_SERVICE_MEMBERS
812 -> decltype(observable_member(on_error_resume_next_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
813 /// \endcond
814 {
815 return observable_member(on_error_resume_next_tag{}, *this, std::forward<AN>(an)...);
816 }
817
818 /*! @copydoc rx-on_error_resume_next.hpp
819 */
820 template<class... AN>
switch_on_error(AN &&...an) const821 auto switch_on_error(AN&&... an) const
822 /// \cond SHOW_SERVICE_MEMBERS
823 -> decltype(observable_member(on_error_resume_next_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
824 /// \endcond
825 {
826 return observable_member(on_error_resume_next_tag{}, *this, std::forward<AN>(an)...);
827 }
828
829 /*! @copydoc rx-map.hpp
830 */
831 template<class... AN>
map(AN &&...an) const832 auto map(AN&&... an) const
833 /// \cond SHOW_SERVICE_MEMBERS
834 -> decltype(observable_member(map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
835 /// \endcond
836 {
837 return observable_member(map_tag{}, *this, std::forward<AN>(an)...);
838 }
839
840 /*! @copydoc rx-map.hpp
841 */
842 template<class... AN>
transform(AN &&...an) const843 auto transform(AN&&... an) const
844 /// \cond SHOW_SERVICE_MEMBERS
845 -> decltype(observable_member(map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
846 /// \endcond
847 {
848 return observable_member(map_tag{}, *this, std::forward<AN>(an)...);
849 }
850
851 /*! @copydoc rx-debounce.hpp
852 */
853 template<class... AN>
debounce(AN &&...an) const854 auto debounce(AN&&... an) const
855 /// \cond SHOW_SERVICE_MEMBERS
856 -> decltype(observable_member(debounce_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
857 /// \endcond
858 {
859 return observable_member(debounce_tag{}, *this, std::forward<AN>(an)...);
860 }
861
862 /*! @copydoc rx-delay.hpp
863 */
864 template<class... AN>
delay(AN &&...an) const865 auto delay(AN&&... an) const
866 /// \cond SHOW_SERVICE_MEMBERS
867 -> decltype(observable_member(delay_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
868 /// \endcond
869 {
870 return observable_member(delay_tag{}, *this, std::forward<AN>(an)...);
871 }
872
873 /*! @copydoc rx-distinct.hpp
874 */
875 template<class... AN>
distinct(AN &&...an) const876 auto distinct(AN&&... an) const
877 /// \cond SHOW_SERVICE_MEMBERS
878 -> decltype(observable_member(distinct_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
879 /// \endcond
880 {
881 return observable_member(distinct_tag{}, *this, std::forward<AN>(an)...);
882 }
883
884 /*! @copydoc rx-distinct_until_changed.hpp
885 */
886 template<class... AN>
distinct_until_changed(AN &&...an) const887 auto distinct_until_changed(AN&&... an) const
888 /// \cond SHOW_SERVICE_MEMBERS
889 -> decltype(observable_member(distinct_until_changed_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
890 /// \endcond
891 {
892 return observable_member(distinct_until_changed_tag{}, *this, std::forward<AN>(an)...);
893 }
894
895 /*! @copydoc rx-element_at.hpp
896 */
897 template<class... AN>
element_at(AN &&...an) const898 auto element_at(AN&&... an) const
899 /// \cond SHOW_SERVICE_MEMBERS
900 -> decltype(observable_member(element_at_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
901 /// \endcond
902 {
903 return observable_member(element_at_tag{}, *this, std::forward<AN>(an)...);
904 }
905
906 /*! @copydoc rx-window.hpp
907 */
908 template<class... AN>
window(AN &&...an) const909 auto window(AN&&... an) const
910 /// \cond SHOW_SERVICE_MEMBERS
911 -> decltype(observable_member(window_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
912 /// \endcond
913 {
914 return observable_member(window_tag{}, *this, std::forward<AN>(an)...);
915 }
916
917 /*! @copydoc rx-window_time.hpp
918 */
919 template<class... AN>
window_with_time(AN &&...an) const920 auto window_with_time(AN&&... an) const
921 /// \cond SHOW_SERVICE_MEMBERS
922 -> decltype(observable_member(window_with_time_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
923 /// \endcond
924 {
925 return observable_member(window_with_time_tag{}, *this, std::forward<AN>(an)...);
926 }
927
928 /*! @copydoc rx-window_time_count.hpp
929 */
930 template<class... AN>
window_with_time_or_count(AN &&...an) const931 auto window_with_time_or_count(AN&&... an) const
932 /// \cond SHOW_SERVICE_MEMBERS
933 -> decltype(observable_member(window_with_time_or_count_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
934 /// \endcond
935 {
936 return observable_member(window_with_time_or_count_tag{}, *this, std::forward<AN>(an)...);
937 }
938
939 /*! @copydoc rx-window_toggle.hpp
940 */
941 template<class... AN>
window_toggle(AN &&...an) const942 auto window_toggle(AN&&... an) const
943 /// \cond SHOW_SERVICE_MEMBERS
944 -> decltype(observable_member(window_toggle_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
945 /// \endcond
946 {
947 return observable_member(window_toggle_tag{}, *this, std::forward<AN>(an)...);
948 }
949
950 /*! @copydoc rx-buffer_count.hpp
951 */
952 template<class... AN>
buffer(AN &&...an) const953 auto buffer(AN&&... an) const
954 /// \cond SHOW_SERVICE_MEMBERS
955 -> decltype(observable_member(buffer_count_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
956 /// \endcond
957 {
958 return observable_member(buffer_count_tag{}, *this, std::forward<AN>(an)...);
959 }
960
961 /*! @copydoc rx-buffer_time.hpp
962 */
963 template<class... AN>
buffer_with_time(AN &&...an) const964 auto buffer_with_time(AN&&... an) const
965 /// \cond SHOW_SERVICE_MEMBERS
966 -> decltype(observable_member(buffer_with_time_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
967 /// \endcond
968 {
969 return observable_member(buffer_with_time_tag{}, *this, std::forward<AN>(an)...);
970 }
971
972 /*! @copydoc rx-buffer_time_count.hpp
973 */
974 template<class... AN>
buffer_with_time_or_count(AN &&...an) const975 auto buffer_with_time_or_count(AN&&... an) const
976 /// \cond SHOW_SERVICE_MEMBERS
977 -> decltype(observable_member(buffer_with_time_or_count_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
978 /// \endcond
979 {
980 return observable_member(buffer_with_time_or_count_tag{}, *this, std::forward<AN>(an)...);
981 }
982
983 /*! @copydoc rx-switch_on_next.hpp
984 */
985 template<class... AN>
switch_on_next(AN &&...an) const986 auto switch_on_next(AN&&... an) const
987 /// \cond SHOW_SERVICE_MEMBERS
988 -> decltype(observable_member(switch_on_next_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
989 /// \endcond
990 {
991 return observable_member(switch_on_next_tag{}, *this, std::forward<AN>(an)...);
992 }
993
994 /*! @copydoc rx-merge.hpp
995 */
996 template<class... AN>
merge(AN...an) const997 auto merge(AN... an) const
998 /// \cond SHOW_SERVICE_MEMBERS
999 -> decltype(observable_member(merge_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1000 /// \endcond
1001 {
1002 return observable_member(merge_tag{}, *this, std::forward<AN>(an)...);
1003 }
1004
1005 /*! @copydoc rx-merge_delay_error.hpp
1006 */
1007 template<class... AN>
merge_delay_error(AN...an) const1008 auto merge_delay_error(AN... an) const
1009 /// \cond SHOW_SERVICE_MEMBERS
1010 -> decltype(observable_member(merge_delay_error_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1011 /// \endcond
1012 {
1013 return observable_member(merge_delay_error_tag{}, *this, std::forward<AN>(an)...);
1014 }
1015
1016 /*! @copydoc rx-amb.hpp
1017 */
1018 template<class... AN>
amb(AN...an) const1019 auto amb(AN... an) const
1020 /// \cond SHOW_SERVICE_MEMBERS
1021 -> decltype(observable_member(amb_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1022 /// \endcond
1023 {
1024 return observable_member(amb_tag{}, *this, std::forward<AN>(an)...);
1025 }
1026
1027 /*! @copydoc rx-flat_map.hpp
1028 */
1029 template<class... AN>
flat_map(AN &&...an) const1030 auto flat_map(AN&&... an) const
1031 /// \cond SHOW_SERVICE_MEMBERS
1032 -> decltype(observable_member(flat_map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1033 /// \endcond
1034 {
1035 return observable_member(flat_map_tag{}, *this, std::forward<AN>(an)...);
1036 }
1037
1038 /*! @copydoc rx-flat_map.hpp
1039 */
1040 template<class... AN>
merge_transform(AN &&...an) const1041 auto merge_transform(AN&&... an) const
1042 /// \cond SHOW_SERVICE_MEMBERS
1043 -> decltype(observable_member(flat_map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1044 /// \endcond
1045 {
1046 return observable_member(flat_map_tag{}, *this, std::forward<AN>(an)...);
1047 }
1048
1049 /*! @copydoc rx-concat.hpp
1050 */
1051 template<class... AN>
concat(AN...an) const1052 auto concat(AN... an) const
1053 /// \cond SHOW_SERVICE_MEMBERS
1054 -> decltype(observable_member(concat_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1055 /// \endcond
1056 {
1057 return observable_member(concat_tag{}, *this, std::forward<AN>(an)...);
1058 }
1059
1060 /*! @copydoc rx-concat_map.hpp
1061 */
1062 template<class... AN>
concat_map(AN &&...an) const1063 auto concat_map(AN&&... an) const
1064 /// \cond SHOW_SERVICE_MEMBERS
1065 -> decltype(observable_member(concat_map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1066 /// \endcond
1067 {
1068 return observable_member(concat_map_tag{}, *this, std::forward<AN>(an)...);
1069 }
1070
1071 /*! @copydoc rx-concat_map.hpp
1072 */
1073 template<class... AN>
concat_transform(AN &&...an) const1074 auto concat_transform(AN&&... an) const
1075 /// \cond SHOW_SERVICE_MEMBERS
1076 -> decltype(observable_member(concat_map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1077 /// \endcond
1078 {
1079 return observable_member(concat_map_tag{}, *this, std::forward<AN>(an)...);
1080 }
1081
1082 /*! @copydoc rx-with_latest_from.hpp
1083 */
1084 template<class... AN>
with_latest_from(AN...an) const1085 auto with_latest_from(AN... an) const
1086 /// \cond SHOW_SERVICE_MEMBERS
1087 -> decltype(observable_member(with_latest_from_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1088 /// \endcond
1089 {
1090 return observable_member(with_latest_from_tag{}, *this, std::forward<AN>(an)...);
1091 }
1092
1093
1094 /*! @copydoc rx-combine_latest.hpp
1095 */
1096 template<class... AN>
combine_latest(AN...an) const1097 auto combine_latest(AN... an) const
1098 /// \cond SHOW_SERVICE_MEMBERS
1099 -> decltype(observable_member(combine_latest_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1100 /// \endcond
1101 {
1102 return observable_member(combine_latest_tag{}, *this, std::forward<AN>(an)...);
1103 }
1104
1105 /*! @copydoc rx-zip.hpp
1106 */
1107 template<class... AN>
zip(AN &&...an) const1108 auto zip(AN&&... an) const
1109 /// \cond SHOW_SERVICE_MEMBERS
1110 -> decltype(observable_member(zip_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1111 /// \endcond
1112 {
1113 return observable_member(zip_tag{}, *this, std::forward<AN>(an)...);
1114 }
1115
1116 /*! @copydoc rx-group_by.hpp
1117 */
1118 template<class... AN>
group_by(AN &&...an) const1119 inline auto group_by(AN&&... an) const
1120 /// \cond SHOW_SERVICE_MEMBERS
1121 -> decltype(observable_member(group_by_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1122 /// \endcond
1123 {
1124 return observable_member(group_by_tag{}, *this, std::forward<AN>(an)...);
1125 }
1126
1127 /*! @copydoc rx-ignore_elements.hpp
1128 */
1129 template<class... AN>
ignore_elements(AN &&...an) const1130 auto ignore_elements(AN&&... an) const
1131 /// \cond SHOW_SERVICE_MEMBERS
1132 -> decltype(observable_member(ignore_elements_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1133 /// \endcond
1134 {
1135 return observable_member(ignore_elements_tag{}, *this, std::forward<AN>(an)...);
1136 }
1137
1138 /*! @copydoc rx-muticast.hpp
1139 */
1140 template<class... AN>
multicast(AN &&...an) const1141 auto multicast(AN&&... an) const
1142 /// \cond SHOW_SERVICE_MEMBERS
1143 -> decltype(observable_member(multicast_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1144 /// \endcond
1145 {
1146 return observable_member(multicast_tag{}, *this, std::forward<AN>(an)...);
1147 }
1148
1149 /*! @copydoc rx-publish.hpp
1150 */
1151 template<class... AN>
publish(AN &&...an) const1152 auto publish(AN&&... an) const
1153 /// \cond SHOW_SERVICE_MEMBERS
1154 -> decltype(observable_member(publish_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1155 /// \endcond
1156 {
1157 return observable_member(publish_tag{}, *this, std::forward<AN>(an)...);
1158 }
1159
1160 /*! @copydoc rxcpp::operators::publish_synchronized
1161 */
1162 template<class... AN>
publish_synchronized(AN &&...an) const1163 auto publish_synchronized(AN&&... an) const
1164 /// \cond SHOW_SERVICE_MEMBERS
1165 -> decltype(observable_member(publish_synchronized_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1166 /// \endcond
1167 {
1168 return observable_member(publish_synchronized_tag{}, *this, std::forward<AN>(an)...);
1169 }
1170
1171 /*! @copydoc rx-replay.hpp
1172 */
1173 template<class... AN>
replay(AN &&...an) const1174 auto replay(AN&&... an) const
1175 /// \cond SHOW_SERVICE_MEMBERS
1176 -> decltype(observable_member(replay_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1177 /// \endcond
1178 {
1179 return observable_member(replay_tag{}, *this, std::forward<AN>(an)...);
1180 }
1181
1182 /*! @copydoc rx-subscribe_on.hpp
1183 */
1184 template<class... AN>
subscribe_on(AN &&...an) const1185 auto subscribe_on(AN&&... an) const
1186 /// \cond SHOW_SERVICE_MEMBERS
1187 -> decltype(observable_member(subscribe_on_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1188 /// \endcond
1189 {
1190 return observable_member(subscribe_on_tag{}, *this, std::forward<AN>(an)...);
1191 }
1192
1193 /*! @copydoc rx-observe_on.hpp
1194 */
1195 template<class... AN>
observe_on(AN &&...an) const1196 auto observe_on(AN&&... an) const
1197 /// \cond SHOW_SERVICE_MEMBERS
1198 -> decltype(observable_member(observe_on_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1199 /// \endcond
1200 {
1201 return observable_member(observe_on_tag{}, *this, std::forward<AN>(an)...);
1202 }
1203
1204 /*! @copydoc rx-reduce.hpp
1205 */
1206 template<class... AN>
reduce(AN &&...an) const1207 auto reduce(AN&&... an) const
1208 /// \cond SHOW_SERVICE_MEMBERS
1209 -> decltype(observable_member(reduce_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1210 /// \endcond
1211 {
1212 return observable_member(reduce_tag{}, *this, std::forward<AN>(an)...);
1213 }
1214
1215 /*! @copydoc rx-reduce.hpp
1216 */
1217 template<class... AN>
accumulate(AN &&...an) const1218 auto accumulate(AN&&... an) const
1219 /// \cond SHOW_SERVICE_MEMBERS
1220 -> decltype(observable_member(reduce_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1221 /// \endcond
1222 {
1223 return observable_member(reduce_tag{}, *this, std::forward<AN>(an)...);
1224 }
1225
1226 /*! @copydoc rxcpp::operators::first
1227 */
1228 template<class... AN>
first(AN ** ...) const1229 auto first(AN**...) const
1230 /// \cond SHOW_SERVICE_MEMBERS
1231 -> decltype(observable_member(delayed_type<first_tag, AN...>::value(), *(this_type*)nullptr))
1232 /// \endcond
1233 {
1234 return observable_member(delayed_type<first_tag, AN...>::value(), *this);
1235 static_assert(sizeof...(AN) == 0, "first() was passed too many arguments.");
1236 }
1237
1238 /*! @copydoc rxcpp::operators::last
1239 */
1240 template<class... AN>
last(AN ** ...) const1241 auto last(AN**...) const
1242 /// \cond SHOW_SERVICE_MEMBERS
1243 -> decltype(observable_member(delayed_type<last_tag, AN...>::value(), *(this_type*)nullptr))
1244 /// \endcond
1245 {
1246 return observable_member(delayed_type<last_tag, AN...>::value(), *this);
1247 static_assert(sizeof...(AN) == 0, "last() was passed too many arguments.");
1248 }
1249
1250 /*! @copydoc rxcpp::operators::count
1251 */
1252 template<class... AN>
count(AN ** ...) const1253 auto count(AN**...) const
1254 /// \cond SHOW_SERVICE_MEMBERS
1255 -> decltype(observable_member(delayed_type<reduce_tag, AN...>::value(), *(this_type*)nullptr, 0, rxu::count(), identity_for<int>()))
1256 /// \endcond
1257 {
1258 return observable_member(delayed_type<reduce_tag, AN...>::value(), *this, 0, rxu::count(), identity_for<int>());
1259 static_assert(sizeof...(AN) == 0, "count() was passed too many arguments.");
1260 }
1261
1262 /*! @copydoc rxcpp::operators::sum
1263 */
1264 template<class... AN>
sum(AN ** ...) const1265 auto sum(AN**...) const
1266 /// \cond SHOW_SERVICE_MEMBERS
1267 -> decltype(observable_member(delayed_type<sum_tag, AN...>::value(), *(this_type*)nullptr))
1268 /// \endcond
1269 {
1270 return observable_member(delayed_type<sum_tag, AN...>::value(), *this);
1271 static_assert(sizeof...(AN) == 0, "sum() was passed too many arguments.");
1272 }
1273
1274 /*! @copydoc rxcpp::operators::average
1275 */
1276 template<class... AN>
average(AN ** ...) const1277 auto average(AN**...) const
1278 /// \cond SHOW_SERVICE_MEMBERS
1279 -> decltype(observable_member(delayed_type<average_tag, AN...>::value(), *(this_type*)nullptr))
1280 /// \endcond
1281 {
1282 return observable_member(delayed_type<average_tag, AN...>::value(), *this);
1283 static_assert(sizeof...(AN) == 0, "average() was passed too many arguments.");
1284 }
1285
1286 /*! @copydoc rxcpp::operators::max
1287 */
1288 template<class... AN>
max(AN ** ...) const1289 auto max(AN**...) const
1290 /// \cond SHOW_SERVICE_MEMBERS
1291 -> decltype(observable_member(delayed_type<max_tag, AN...>::value(), *(this_type*)nullptr))
1292 /// \endcond
1293 {
1294 return observable_member(delayed_type<max_tag, AN...>::value(), *this);
1295 static_assert(sizeof...(AN) == 0, "max() was passed too many arguments.");
1296 }
1297
1298 /*! @copydoc rxcpp::operators::min
1299 */
1300 template<class... AN>
min(AN ** ...) const1301 auto min(AN**...) const
1302 /// \cond SHOW_SERVICE_MEMBERS
1303 -> decltype(observable_member(delayed_type<min_tag, AN...>::value(), *(this_type*)nullptr))
1304 /// \endcond
1305 {
1306 return observable_member(delayed_type<min_tag, AN...>::value(), *this);
1307 static_assert(sizeof...(AN) == 0, "min() was passed too many arguments.");
1308 }
1309
1310 /*! @copydoc rx-scan.hpp
1311 */
1312 template<class... AN>
scan(AN...an) const1313 auto scan(AN... an) const
1314 /// \cond SHOW_SERVICE_MEMBERS
1315 -> decltype(observable_member(scan_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1316 /// \endcond
1317 {
1318 return observable_member(scan_tag{}, *this, std::forward<AN>(an)...);
1319 }
1320
1321 /*! @copydoc rx-sample_time.hpp
1322 */
1323 template<class... AN>
sample_with_time(AN &&...an) const1324 auto sample_with_time(AN&&... an) const
1325 /// \cond SHOW_SERVICE_MEMBERS
1326 -> decltype(observable_member(sample_with_time_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1327 /// \endcond
1328 {
1329 return observable_member(sample_with_time_tag{}, *this, std::forward<AN>(an)...);
1330 }
1331
1332 /*! @copydoc rx-skip.hpp
1333 */
1334 template<class... AN>
skip(AN...an) const1335 auto skip(AN... an) const
1336 /// \cond SHOW_SERVICE_MEMBERS
1337 -> decltype(observable_member(skip_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1338 /// \endcond
1339 {
1340 return observable_member(skip_tag{}, *this, std::forward<AN>(an)...);
1341 }
1342
1343 /*! @copydoc rx-skip.hpp
1344 */
1345 template<class... AN>
skip_while(AN...an) const1346 auto skip_while(AN... an) const
1347 /// \cond SHOW_SERVICE_MEMBERS
1348 -> decltype(observable_member(skip_while_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1349 /// \endcond
1350 {
1351 return observable_member(skip_while_tag{}, *this, std::forward<AN>(an)...);
1352 }
1353
1354 /*! @copydoc rx-skip_last.hpp
1355 */
1356 template<class... AN>
skip_last(AN...an) const1357 auto skip_last(AN... an) const
1358 /// \cond SHOW_SERVICE_MEMBERS
1359 -> decltype(observable_member(skip_last_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1360 /// \endcond
1361 {
1362 return observable_member(skip_last_tag{}, *this, std::forward<AN>(an)...);
1363 }
1364
1365 /*! @copydoc rx-skip_until.hpp
1366 */
1367 template<class... AN>
skip_until(AN...an) const1368 auto skip_until(AN... an) const
1369 /// \cond SHOW_SERVICE_MEMBERS
1370 -> decltype(observable_member(skip_until_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1371 /// \endcond
1372 {
1373 return observable_member(skip_until_tag{}, *this, std::forward<AN>(an)...);
1374 }
1375
1376 /*! @copydoc rx-take.hpp
1377 */
1378 template<class... AN>
take(AN...an) const1379 auto take(AN... an) const
1380 /// \cond SHOW_SERVICE_MEMBERS
1381 -> decltype(observable_member(take_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1382 /// \endcond
1383 {
1384 return observable_member(take_tag{}, *this, std::forward<AN>(an)...);
1385 }
1386
1387 /*! @copydoc rx-take_last.hpp
1388 */
1389 template<class... AN>
take_last(AN &&...an) const1390 auto take_last(AN&&... an) const
1391 /// \cond SHOW_SERVICE_MEMBERS
1392 -> decltype(observable_member(take_last_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1393 /// \endcond
1394 {
1395 return observable_member(take_last_tag{}, *this, std::forward<AN>(an)...);
1396 }
1397
1398 /*! @copydoc rx-take_until.hpp
1399 */
1400 template<class... AN>
take_until(AN &&...an) const1401 auto take_until(AN&&... an) const
1402 /// \cond SHOW_SERVICE_MEMBERS
1403 -> decltype(observable_member(take_until_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1404 /// \endcond
1405 {
1406 return observable_member(take_until_tag{}, *this, std::forward<AN>(an)...);
1407 }
1408
1409 /*! @copydoc rx-take_while.hpp
1410 */
1411 template<class... AN>
take_while(AN &&...an) const1412 auto take_while(AN&&... an) const
1413 /// \cond SHOW_SERVICE_MEMBERS
1414 -> decltype(observable_member(take_while_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1415 /// \endcond
1416 {
1417 return observable_member(take_while_tag{}, *this, std::forward<AN>(an)...);
1418 }
1419
1420 /*! @copydoc rx-repeat.hpp
1421 */
1422 template<class... AN>
repeat(AN...an) const1423 auto repeat(AN... an) const
1424 /// \cond SHOW_SERVICE_MEMBERS
1425 -> decltype(observable_member(repeat_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1426 /// \endcond
1427 {
1428 return observable_member(repeat_tag{}, *this, std::forward<AN>(an)...);
1429 }
1430
1431 /*! @copydoc rx-retry.hpp
1432 */
1433 template<class... AN>
retry(AN...an) const1434 auto retry(AN... an) const
1435 /// \cond SHOW_SERVICE_MEMBERS
1436 -> decltype(observable_member(retry_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1437 /// \endcond
1438 {
1439 return observable_member(retry_tag{}, *(this_type*)this, std::forward<AN>(an)...);
1440 }
1441
1442 /*! @copydoc rx-start_with.hpp
1443 */
1444 template<class... AN>
start_with(AN...an) const1445 auto start_with(AN... an) const
1446 /// \cond SHOW_SERVICE_MEMBERS
1447 -> decltype(observable_member(start_with_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1448 /// \endcond
1449 {
1450 return observable_member(start_with_tag{}, *this, std::forward<AN>(an)...);
1451 }
1452
1453 /*! @copydoc rx-pairwise.hpp
1454 */
1455 template<class... AN>
pairwise(AN...an) const1456 auto pairwise(AN... an) const
1457 /// \cond SHOW_SERVICE_MEMBERS
1458 -> decltype(observable_member(pairwise_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1459 /// \endcond
1460 {
1461 return observable_member(pairwise_tag{}, *this, std::forward<AN>(an)...);
1462 }
1463 };
1464
1465 template<class T, class SourceOperator>
operator ==(const observable<T,SourceOperator> & lhs,const observable<T,SourceOperator> & rhs)1466 inline bool operator==(const observable<T, SourceOperator>& lhs, const observable<T, SourceOperator>& rhs) {
1467 return lhs.source_operator == rhs.source_operator;
1468 }
1469 template<class T, class SourceOperator>
operator !=(const observable<T,SourceOperator> & lhs,const observable<T,SourceOperator> & rhs)1470 inline bool operator!=(const observable<T, SourceOperator>& lhs, const observable<T, SourceOperator>& rhs) {
1471 return !(lhs == rhs);
1472 }
1473
1474 /*!
1475 \defgroup group-core Basics
1476
1477 \brief These are the core classes that combine to represent a set of values emitted over time that can be cancelled.
1478
1479 \class rxcpp::observable<void, void>
1480
1481 \brief typed as ```rxcpp::observable<>```, this is a collection of factory methods that return an observable.
1482
1483 \ingroup group-core
1484
1485 \par Create a new type of observable
1486
1487 \sample
1488 \snippet create.cpp Create sample
1489 \snippet output.txt Create sample
1490
1491 \par Create an observable that emits a range of values
1492
1493 \sample
1494 \snippet range.cpp range sample
1495 \snippet output.txt range sample
1496
1497 \par Create an observable that emits nothing / generates an error / immediately completes
1498
1499 \sample
1500 \snippet never.cpp never sample
1501 \snippet output.txt never sample
1502 \snippet error.cpp error sample
1503 \snippet output.txt error sample
1504 \snippet empty.cpp empty sample
1505 \snippet output.txt empty sample
1506
1507 \par Create an observable that generates new observable for each subscriber
1508
1509 \sample
1510 \snippet defer.cpp defer sample
1511 \snippet output.txt defer sample
1512
1513 \par Create an observable that emits items every specified interval of time
1514
1515 \sample
1516 \snippet interval.cpp interval sample
1517 \snippet output.txt interval sample
1518
1519 \par Create an observable that emits items in the specified interval of time
1520
1521 \sample
1522 \snippet timer.cpp duration timer sample
1523 \snippet output.txt duration timer sample
1524
1525 \par Create an observable that emits all items from a collection
1526
1527 \sample
1528 \snippet iterate.cpp iterate sample
1529 \snippet output.txt iterate sample
1530
1531 \par Create an observable that emits a set of specified items
1532
1533 \sample
1534 \snippet from.cpp from sample
1535 \snippet output.txt from sample
1536
1537 \par Create an observable that emits a single item
1538
1539 \sample
1540 \snippet just.cpp just sample
1541 \snippet output.txt just sample
1542
1543 \par Create an observable that emits a set of items and then subscribes to another observable
1544
1545 \sample
1546 \snippet start_with.cpp full start_with sample
1547 \snippet output.txt full start_with sample
1548
1549 \par Create an observable that generates a new observable based on a generated resource for each subscriber
1550
1551 \sample
1552 \snippet scope.cpp scope sample
1553 \snippet output.txt scope sample
1554
1555 */
1556 template<>
1557 class observable<void, void>
1558 {
1559 ~observable();
1560 public:
1561 /*! @copydoc rx-create.hpp
1562 */
1563 template<class T, class OnSubscribe>
create(OnSubscribe os)1564 static auto create(OnSubscribe os)
1565 -> decltype(rxs::create<T>(std::move(os))) {
1566 return rxs::create<T>(std::move(os));
1567 }
1568
1569 /*! @copydoc rx-range.hpp
1570 */
1571 template<class T>
range(T first=0,T last=std::numeric_limits<T>::max (),std::ptrdiff_t step=1)1572 static auto range(T first = 0, T last = std::numeric_limits<T>::max(), std::ptrdiff_t step = 1)
1573 -> decltype(rxs::range<T>(first, last, step, identity_current_thread())) {
1574 return rxs::range<T>(first, last, step, identity_current_thread());
1575 }
1576 /*! @copydoc rx-range.hpp
1577 */
1578 template<class T, class Coordination>
range(T first,T last,std::ptrdiff_t step,Coordination cn)1579 static auto range(T first, T last, std::ptrdiff_t step, Coordination cn)
1580 -> decltype(rxs::range<T>(first, last, step, std::move(cn))) {
1581 return rxs::range<T>(first, last, step, std::move(cn));
1582 }
1583 /*! @copydoc rx-range.hpp
1584 */
1585 template<class T, class Coordination>
range(T first,T last,Coordination cn)1586 static auto range(T first, T last, Coordination cn)
1587 -> decltype(rxs::range<T>(first, last, std::move(cn))) {
1588 return rxs::range<T>(first, last, std::move(cn));
1589 }
1590 /*! @copydoc rx-range.hpp
1591 */
1592 template<class T, class Coordination>
range(T first,Coordination cn)1593 static auto range(T first, Coordination cn)
1594 -> decltype(rxs::range<T>(first, std::move(cn))) {
1595 return rxs::range<T>(first, std::move(cn));
1596 }
1597
1598 /*! @copydoc rx-never.hpp
1599 */
1600 template<class T>
never()1601 static auto never()
1602 -> decltype(rxs::never<T>()) {
1603 return rxs::never<T>();
1604 }
1605
1606 /*! @copydoc rx-defer.hpp
1607 */
1608 template<class ObservableFactory>
defer(ObservableFactory of)1609 static auto defer(ObservableFactory of)
1610 -> decltype(rxs::defer(std::move(of))) {
1611 return rxs::defer(std::move(of));
1612 }
1613
1614 /*! @copydoc rx-interval.hpp
1615 */
1616 template<class... AN>
interval(rxsc::scheduler::clock_type::duration period,AN ** ...)1617 static auto interval(rxsc::scheduler::clock_type::duration period, AN**...)
1618 -> decltype(rxs::interval(period)) {
1619 return rxs::interval(period);
1620 static_assert(sizeof...(AN) == 0, "interval(period) was passed too many arguments.");
1621 }
1622 /*! @copydoc rx-interval.hpp
1623 */
1624 template<class Coordination>
interval(rxsc::scheduler::clock_type::duration period,Coordination cn)1625 static auto interval(rxsc::scheduler::clock_type::duration period, Coordination cn)
1626 -> decltype(rxs::interval(period, std::move(cn))) {
1627 return rxs::interval(period, std::move(cn));
1628 }
1629 /*! @copydoc rx-interval.hpp
1630 */
1631 template<class... AN>
interval(rxsc::scheduler::clock_type::time_point initial,rxsc::scheduler::clock_type::duration period,AN ** ...)1632 static auto interval(rxsc::scheduler::clock_type::time_point initial, rxsc::scheduler::clock_type::duration period, AN**...)
1633 -> decltype(rxs::interval(initial, period)) {
1634 return rxs::interval(initial, period);
1635 static_assert(sizeof...(AN) == 0, "interval(initial, period) was passed too many arguments.");
1636 }
1637 /*! @copydoc rx-interval.hpp
1638 */
1639 template<class Coordination>
interval(rxsc::scheduler::clock_type::time_point initial,rxsc::scheduler::clock_type::duration period,Coordination cn)1640 static auto interval(rxsc::scheduler::clock_type::time_point initial, rxsc::scheduler::clock_type::duration period, Coordination cn)
1641 -> decltype(rxs::interval(initial, period, std::move(cn))) {
1642 return rxs::interval(initial, period, std::move(cn));
1643 }
1644
1645 /*! @copydoc rx-timer.hpp
1646 */
1647 template<class... AN>
timer(rxsc::scheduler::clock_type::time_point at,AN ** ...)1648 static auto timer(rxsc::scheduler::clock_type::time_point at, AN**...)
1649 -> decltype(rxs::timer(at)) {
1650 return rxs::timer(at);
1651 static_assert(sizeof...(AN) == 0, "timer(at) was passed too many arguments.");
1652 }
1653 /*! @copydoc rx-timer.hpp
1654 */
1655 template<class... AN>
timer(rxsc::scheduler::clock_type::duration after,AN ** ...)1656 static auto timer(rxsc::scheduler::clock_type::duration after, AN**...)
1657 -> decltype(rxs::timer(after)) {
1658 return rxs::timer(after);
1659 static_assert(sizeof...(AN) == 0, "timer(after) was passed too many arguments.");
1660 }
1661 /*! @copydoc rx-timer.hpp
1662 */
1663 template<class Coordination>
timer(rxsc::scheduler::clock_type::time_point when,Coordination cn)1664 static auto timer(rxsc::scheduler::clock_type::time_point when, Coordination cn)
1665 -> decltype(rxs::timer(when, std::move(cn))) {
1666 return rxs::timer(when, std::move(cn));
1667 }
1668 /*! @copydoc rx-timer.hpp
1669 */
1670 template<class Coordination>
timer(rxsc::scheduler::clock_type::duration when,Coordination cn)1671 static auto timer(rxsc::scheduler::clock_type::duration when, Coordination cn)
1672 -> decltype(rxs::timer(when, std::move(cn))) {
1673 return rxs::timer(when, std::move(cn));
1674 }
1675
1676 /*! @copydoc rx-iterate.hpp
1677 */
1678 template<class Collection>
iterate(Collection c)1679 static auto iterate(Collection c)
1680 -> decltype(rxs::iterate(std::move(c), identity_current_thread())) {
1681 return rxs::iterate(std::move(c), identity_current_thread());
1682 }
1683 /*! @copydoc rx-iterate.hpp
1684 */
1685 template<class Collection, class Coordination>
iterate(Collection c,Coordination cn)1686 static auto iterate(Collection c, Coordination cn)
1687 -> decltype(rxs::iterate(std::move(c), std::move(cn))) {
1688 return rxs::iterate(std::move(c), std::move(cn));
1689 }
1690
1691 /*! @copydoc rxcpp::sources::from()
1692 */
1693 template<class T>
from()1694 static auto from()
1695 -> decltype( rxs::from<T>()) {
1696 return rxs::from<T>();
1697 }
1698 /*! @copydoc rxcpp::sources::from(Coordination cn)
1699 */
1700 template<class T, class Coordination>
from(Coordination cn)1701 static auto from(Coordination cn)
1702 -> typename std::enable_if<is_coordination<Coordination>::value,
1703 decltype( rxs::from<T>(std::move(cn)))>::type {
1704 return rxs::from<T>(std::move(cn));
1705 }
1706 /*! @copydoc rxcpp::sources::from(Value0 v0, ValueN... vn)
1707 */
1708 template<class Value0, class... ValueN>
from(Value0 v0,ValueN...vn)1709 static auto from(Value0 v0, ValueN... vn)
1710 -> typename std::enable_if<!is_coordination<Value0>::value,
1711 decltype( rxs::from(v0, vn...))>::type {
1712 return rxs::from(v0, vn...);
1713 }
1714 /*! @copydoc rxcpp::sources::from(Coordination cn, Value0 v0, ValueN... vn)
1715 */
1716 template<class Coordination, class Value0, class... ValueN>
from(Coordination cn,Value0 v0,ValueN...vn)1717 static auto from(Coordination cn, Value0 v0, ValueN... vn)
1718 -> typename std::enable_if<is_coordination<Coordination>::value,
1719 decltype( rxs::from(std::move(cn), v0, vn...))>::type {
1720 return rxs::from(std::move(cn), v0, vn...);
1721 }
1722
1723 /*! @copydoc rxcpp::sources::just(Value0 v0)
1724 */
1725 template<class T>
just(T v)1726 static auto just(T v)
1727 -> decltype(rxs::just(std::move(v))) {
1728 return rxs::just(std::move(v));
1729 }
1730 /*! @copydoc rxcpp::sources::just(Value0 v0, Coordination cn)
1731 */
1732 template<class T, class Coordination>
just(T v,Coordination cn)1733 static auto just(T v, Coordination cn)
1734 -> decltype(rxs::just(std::move(v), std::move(cn))) {
1735 return rxs::just(std::move(v), std::move(cn));
1736 }
1737
1738 /*! @copydoc rxcpp::sources::start_with(Observable o, Value0 v0, ValueN... vn)
1739 */
1740 template<class Observable, class Value0, class... ValueN>
start_with(Observable o,Value0 v0,ValueN...vn)1741 static auto start_with(Observable o, Value0 v0, ValueN... vn)
1742 -> decltype(rxs::start_with(std::move(o), std::move(v0), std::move(vn)...)) {
1743 return rxs::start_with(std::move(o), std::move(v0), std::move(vn)...);
1744 }
1745
1746 /*! @copydoc rx-empty.hpp
1747 */
1748 template<class T>
empty()1749 static auto empty()
1750 -> decltype(from<T>()) {
1751 return from<T>();
1752 }
1753 /*! @copydoc rx-empty.hpp
1754 */
1755 template<class T, class Coordination>
empty(Coordination cn)1756 static auto empty(Coordination cn)
1757 -> decltype(from<T>(std::move(cn))) {
1758 return from<T>(std::move(cn));
1759 }
1760
1761 /*! @copydoc rx-error.hpp
1762 */
1763 template<class T, class Exception>
error(Exception && e)1764 static auto error(Exception&& e)
1765 -> decltype(rxs::error<T>(std::forward<Exception>(e))) {
1766 return rxs::error<T>(std::forward<Exception>(e));
1767 }
1768 /*! @copydoc rx-error.hpp
1769 */
1770 template<class T, class Exception, class Coordination>
error(Exception && e,Coordination cn)1771 static auto error(Exception&& e, Coordination cn)
1772 -> decltype(rxs::error<T>(std::forward<Exception>(e), std::move(cn))) {
1773 return rxs::error<T>(std::forward<Exception>(e), std::move(cn));
1774 }
1775
1776 /*! @copydoc rx-scope.hpp
1777 */
1778 template<class ResourceFactory, class ObservableFactory>
scope(ResourceFactory rf,ObservableFactory of)1779 static auto scope(ResourceFactory rf, ObservableFactory of)
1780 -> decltype(rxs::scope(std::move(rf), std::move(of))) {
1781 return rxs::scope(std::move(rf), std::move(of));
1782 }
1783 };
1784
1785 }
1786
1787 //
1788 // support range() >> filter() >> subscribe() syntax
1789 // '>>' is spelled 'stream'
1790 //
1791 template<class T, class SourceOperator, class OperatorFactory>
operator >>(const rxcpp::observable<T,SourceOperator> & source,OperatorFactory && of)1792 auto operator >> (const rxcpp::observable<T, SourceOperator>& source, OperatorFactory&& of)
1793 -> decltype(source.op(std::forward<OperatorFactory>(of))) {
1794 return source.op(std::forward<OperatorFactory>(of));
1795 }
1796
1797 //
1798 // support range() | filter() | subscribe() syntax
1799 // '|' is spelled 'pipe'
1800 //
1801 template<class T, class SourceOperator, class OperatorFactory>
operator |(const rxcpp::observable<T,SourceOperator> & source,OperatorFactory && of)1802 auto operator | (const rxcpp::observable<T, SourceOperator>& source, OperatorFactory&& of)
1803 -> decltype(source.op(std::forward<OperatorFactory>(of))) {
1804 return source.op(std::forward<OperatorFactory>(of));
1805 }
1806
1807 #endif
1808