1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 /*! \file rx-reduce.hpp
6 
7     \brief For each item from this observable use Accumulator to combine items, when completed use ResultSelector to produce a value that will be emitted from the new observable that is returned.
8 
9     \tparam Seed            the type of the initial value for the accumulator
10     \tparam Accumulator     the type of the data accumulating function
11     \tparam ResultSelector  the type of the result producing function
12 
13     \param seed  the initial value for the accumulator
14     \param a     an accumulator function to be invoked on each item emitted by the source observable, the result of which will be used in the next accumulator call
15     \param rs    a result producing function that makes the final value from the last accumulator call result
16 
17     \return  An observable that emits a single item that is the result of accumulating the output from the items emitted by the source observable.
18 
19     Some basic reduce-type operators have already been implemented:
20     - rxcpp::operators::first
21     - rxcpp::operators::last
22     - rxcpp::operators::count
23     - rxcpp::operators::sum
24     - rxcpp::operators::average
25     - rxcpp::operators::min
26     - rxcpp::operators::max
27 
28     \sample
29     Geometric mean of source values:
30     \snippet reduce.cpp reduce sample
31     \snippet output.txt reduce sample
32 
33     If the source observable completes without emitting any items, the resulting observable emits the result of passing the initial seed to the result selector:
34     \snippet reduce.cpp reduce empty sample
35     \snippet output.txt reduce empty sample
36 
37     If the accumulator raises an exception, it is returned by the resulting observable in on_error:
38     \snippet reduce.cpp reduce exception from accumulator sample
39     \snippet output.txt reduce exception from accumulator sample
40 
41     The same for exceptions raised by the result selector:
42     \snippet reduce.cpp reduce exception from result selector sample
43     \snippet output.txt reduce exception from result selector sample
44 */
45 
46 #if !defined(RXCPP_OPERATORS_RX_REDUCE_HPP)
47 #define RXCPP_OPERATORS_RX_REDUCE_HPP
48 
49 #include "../rx-includes.hpp"
50 
51 namespace rxcpp {
52 
53 namespace operators {
54 
55 namespace detail {
56 
57 template<class... AN>
58 struct reduce_invalid_arguments {};
59 
60 template<class... AN>
61 struct reduce_invalid : public rxo::operator_base<reduce_invalid_arguments<AN...>> {
62     using type = observable<reduce_invalid_arguments<AN...>, reduce_invalid<AN...>>;
63 };
64 template<class... AN>
65 using reduce_invalid_t = typename reduce_invalid<AN...>::type;
66 
67 template<class Seed, class ResultSelector>
68 struct is_result_function_for {
69 
70     typedef rxu::decay_t<ResultSelector> result_selector_type;
71     typedef rxu::decay_t<Seed> seed_type;
72 
73     struct tag_not_valid {};
74 
75     template<class CS, class CRS>
76     static auto check(int) -> decltype((*(CRS*)nullptr)(*(CS*)nullptr));
77     template<class CS, class CRS>
78     static tag_not_valid check(...);
79 
80     typedef rxu::decay_t<decltype(check<seed_type, result_selector_type>(0))> type;
81     static const bool value = !std::is_same<type, tag_not_valid>::value;
82 };
83 
84 template<class T, class Observable, class Accumulator, class ResultSelector, class Seed>
85 struct reduce_traits
86 {
87     typedef rxu::decay_t<Observable> source_type;
88     typedef rxu::decay_t<Accumulator> accumulator_type;
89     typedef rxu::decay_t<ResultSelector> result_selector_type;
90     typedef rxu::decay_t<Seed> seed_type;
91 
92     typedef T source_value_type;
93 
94     typedef typename is_result_function_for<seed_type, result_selector_type>::type value_type;
95 };
96 
97 template<class T, class Observable, class Accumulator, class ResultSelector, class Seed>
98 struct reduce : public operator_base<rxu::value_type_t<reduce_traits<T, Observable, Accumulator, ResultSelector, Seed>>>
99 {
100     typedef reduce<T, Observable, Accumulator, ResultSelector, Seed> this_type;
101     typedef reduce_traits<T, Observable, Accumulator, ResultSelector, Seed> traits;
102 
103     typedef typename traits::source_type source_type;
104     typedef typename traits::accumulator_type accumulator_type;
105     typedef typename traits::result_selector_type result_selector_type;
106     typedef typename traits::seed_type seed_type;
107 
108     typedef typename traits::source_value_type source_value_type;
109     typedef typename traits::value_type value_type;
110 
111     struct reduce_initial_type
112     {
~reduce_initial_typerxcpp::operators::detail::reduce::reduce_initial_type113         ~reduce_initial_type()
114         {
115         }
reduce_initial_typerxcpp::operators::detail::reduce::reduce_initial_type116         reduce_initial_type(source_type o, accumulator_type a, result_selector_type rs, seed_type s)
117             : source(std::move(o))
118             , accumulator(std::move(a))
119             , result_selector(std::move(rs))
120             , seed(std::move(s))
121         {
122         }
123         source_type source;
124         accumulator_type accumulator;
125         result_selector_type result_selector;
126         seed_type seed;
127 
128     private:
129         reduce_initial_type& operator=(reduce_initial_type o) RXCPP_DELETE;
130     };
131     reduce_initial_type initial;
132 
~reducerxcpp::operators::detail::reduce133     ~reduce()
134     {
135     }
reducerxcpp::operators::detail::reduce136     reduce(source_type o, accumulator_type a, result_selector_type rs, seed_type s)
137         : initial(std::move(o), std::move(a), std::move(rs), std::move(s))
138     {
139     }
140     template<class Subscriber>
on_subscriberxcpp::operators::detail::reduce141     void on_subscribe(Subscriber o) const {
142         struct reduce_state_type
143             : public reduce_initial_type
144             , public std::enable_shared_from_this<reduce_state_type>
145         {
146             reduce_state_type(reduce_initial_type i, Subscriber scrbr)
147                 : reduce_initial_type(i)
148                 , source(i.source)
149                 , current(reduce_initial_type::seed)
150                 , out(std::move(scrbr))
151             {
152             }
153             source_type source;
154             seed_type current;
155             Subscriber out;
156 
157         private:
158             reduce_state_type& operator=(reduce_state_type o) RXCPP_DELETE;
159         };
160         auto state = std::make_shared<reduce_state_type>(initial, std::move(o));
161         state->source.subscribe(
162             state->out,
163         // on_next
164             [state](T t) {
165                 seed_type next = state->accumulator(std::move(state->current), std::move(t));
166                 state->current = std::move(next);
167             },
168         // on_error
169             [state](rxu::error_ptr e) {
170                 state->out.on_error(e);
171             },
172         // on_completed
173             [state]() {
174                 auto result = on_exception(
175                     [&](){return state->result_selector(std::move(state->current));},
176                     state->out);
177                 if (result.empty()) {
178                     return;
179                 }
180                 state->out.on_next(std::move(result.get()));
181                 state->out.on_completed();
182             }
183         );
184     }
185 private:
186     reduce& operator=(reduce o) RXCPP_DELETE;
187 };
188 
189 template<class T>
190 struct initialize_seeder {
191     typedef T seed_type;
seedrxcpp::operators::detail::initialize_seeder192     static seed_type seed() {
193         return seed_type{};
194     }
195 };
196 
197 template<class T>
198 struct average {
199     struct seed_type
200     {
seed_typerxcpp::operators::detail::average::seed_type201         seed_type()
202             : value()
203             , count(0)
204         {
205         }
206         rxu::maybe<T> value;
207         int count;
208         rxu::detail::maybe<double> stage;
209     };
seedrxcpp::operators::detail::average210     static seed_type seed() {
211         return seed_type{};
212     }
213     template<class U>
operator ()rxcpp::operators::detail::average214     seed_type operator()(seed_type a, U&& v) {
215         if (a.count != 0 &&
216             (a.count == std::numeric_limits<int>::max() ||
217             ((v > 0) && (*(a.value) > (std::numeric_limits<T>::max() - v))) ||
218             ((v < 0) && (*(a.value) < (std::numeric_limits<T>::min() - v))))) {
219             // would overflow, calc existing and reset for next batch
220             // this will add error to the final result, but the alternative
221             // is to fail on overflow
222             double avg = static_cast<double>(*(a.value)) / a.count;
223             if (!a.stage.empty()) {
224                 a.stage.reset((*a.stage + avg) / 2);
225             } else {
226                 a.stage.reset(avg);
227             }
228             a.value.reset(std::forward<U>(v));
229             a.count = 1;
230         } else if (a.value.empty()) {
231             a.value.reset(std::forward<U>(v));
232             a.count = 1;
233         } else {
234             *(a.value) += v;
235             ++a.count;
236         }
237         return a;
238     }
operator ()rxcpp::operators::detail::average239     double operator()(seed_type a) {
240         if (!a.value.empty()) {
241             double avg = static_cast<double>(*(a.value)) / a.count;
242             if (!a.stage.empty()) {
243                 avg = (*a.stage + avg) / 2;
244             }
245             return avg;
246         }
247         rxu::throw_exception(rxcpp::empty_error("average() requires a stream with at least one value"));
248     }
249 };
250 
251 template<class T>
252 struct sum {
253     typedef rxu::maybe<T> seed_type;
seedrxcpp::operators::detail::sum254     static seed_type seed() {
255         return seed_type();
256     }
257     template<class U>
operator ()rxcpp::operators::detail::sum258     seed_type operator()(seed_type a, U&& v) const {
259         if (a.empty())
260             a.reset(std::forward<U>(v));
261         else
262             *a = *a + v;
263         return a;
264     }
operator ()rxcpp::operators::detail::sum265     T operator()(seed_type a) const {
266         if (a.empty())
267             rxu::throw_exception(rxcpp::empty_error("sum() requires a stream with at least one value"));
268         return *a;
269     }
270 };
271 
272 template<class T>
273 struct max {
274     typedef rxu::maybe<T> seed_type;
seedrxcpp::operators::detail::max275     static seed_type seed() {
276         return seed_type();
277     }
278     template<class U>
operator ()rxcpp::operators::detail::max279     seed_type operator()(seed_type a, U&& v) {
280         if (a.empty() || *a < v)
281             a.reset(std::forward<U>(v));
282         return a;
283     }
operator ()rxcpp::operators::detail::max284     T operator()(seed_type a) {
285         if (a.empty())
286             rxu::throw_exception(rxcpp::empty_error("max() requires a stream with at least one value"));
287         return *a;
288     }
289 };
290 
291 template<class T>
292 struct min {
293     typedef rxu::maybe<T> seed_type;
seedrxcpp::operators::detail::min294     static seed_type seed() {
295         return seed_type();
296     }
297     template<class U>
operator ()rxcpp::operators::detail::min298     seed_type operator()(seed_type a, U&& v) {
299         if (a.empty() || v < *a)
300             a.reset(std::forward<U>(v));
301         return a;
302     }
operator ()rxcpp::operators::detail::min303     T operator()(seed_type a) {
304         if (a.empty())
305             rxu::throw_exception(rxcpp::empty_error("min() requires a stream with at least one value"));
306         return *a;
307     }
308 };
309 
310 template<class T>
311 struct first {
312     using seed_type = rxu::maybe<T>;
seedrxcpp::operators::detail::first313     static seed_type seed() {
314         return seed_type();
315     }
316     template<class U>
operator ()rxcpp::operators::detail::first317     seed_type operator()(seed_type a, U&& v) {
318         a.reset(std::forward<U>(v));
319         return a;
320     }
operator ()rxcpp::operators::detail::first321     T operator()(seed_type a) {
322         if (a.empty()) {
323             rxu::throw_exception(rxcpp::empty_error("first() requires a stream with at least one value"));
324         }
325         return *a;
326     }
327 };
328 
329 template<class T>
330 struct last {
331     using seed_type = rxu::maybe<T>;
seedrxcpp::operators::detail::last332     static seed_type seed() {
333         return seed_type();
334     }
335     template<class U>
operator ()rxcpp::operators::detail::last336     seed_type operator()(seed_type a, U&& v) {
337         a.reset(std::forward<U>(v));
338         return a;
339     }
operator ()rxcpp::operators::detail::last340     T operator()(seed_type a) {
341         if (a.empty()) {
342             rxu::throw_exception(rxcpp::empty_error("last() requires a stream with at least one value"));
343         }
344         return *a;
345     }
346 };
347 
348 }
349 
350 /*! @copydoc rx-reduce.hpp
351 */
352 template<class... AN>
reduce(AN &&...an)353 auto reduce(AN&&... an)
354     ->     operator_factory<reduce_tag, AN...> {
355     return operator_factory<reduce_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
356 }
357 
358 /*! @copydoc rx-reduce.hpp
359 */
360 template<class... AN>
accumulate(AN &&...an)361 auto accumulate(AN&&... an)
362     ->     operator_factory<reduce_tag, AN...> {
363     return operator_factory<reduce_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
364 }
365 
366 /*! \brief For each item from this observable reduce it by sending only the first item.
367 
368     \return  An observable that emits only the very first item emitted by the source observable.
369 
370     \sample
371     \snippet math.cpp first sample
372     \snippet output.txt first sample
373 
374     When the source observable calls on_error:
375     \snippet math.cpp first empty sample
376     \snippet output.txt first empty sample
377 */
first()378 inline auto first()
379     ->     operator_factory<first_tag> {
380     return operator_factory<first_tag>(std::tuple<>{});
381 }
382 
383 /*! \brief For each item from this observable reduce it by sending only the last item.
384 
385     \return  An observable that emits only the very last item emitted by the source observable.
386 
387     \sample
388     \snippet math.cpp last sample
389     \snippet output.txt last sample
390 
391     When the source observable calls on_error:
392     \snippet math.cpp last empty sample
393     \snippet output.txt last empty sample
394 */
last()395 inline auto last()
396     ->     operator_factory<last_tag> {
397     return operator_factory<last_tag>(std::tuple<>{});
398 }
399 
400 /*! \brief For each item from this observable reduce it by incrementing a count.
401 
402     \return  An observable that emits a single item: the number of elements emitted by the source observable.
403 
404     \sample
405     \snippet math.cpp count sample
406     \snippet output.txt count sample
407 
408     When the source observable calls on_error:
409     \snippet math.cpp count error sample
410     \snippet output.txt count error sample
411 */
count()412 inline auto count()
413     ->     operator_factory<reduce_tag, int, rxu::count, rxu::detail::take_at<0>> {
414     return operator_factory<reduce_tag, int, rxu::count, rxu::detail::take_at<0>>(std::make_tuple(0, rxu::count(), rxu::take_at<0>()));
415 }
416 
417 /*! \brief For each item from this observable reduce it by adding to the previous values and then dividing by the number of items at the end.
418 
419     \return  An observable that emits a single item: the average of elements emitted by the source observable.
420 
421     \sample
422     \snippet math.cpp average sample
423     \snippet output.txt average sample
424 
425     When the source observable completes without emitting any items:
426     \snippet math.cpp average empty sample
427     \snippet output.txt average empty sample
428 
429     When the source observable calls on_error:
430     \snippet math.cpp average error sample
431     \snippet output.txt average error sample
432 */
average()433 inline auto average()
434     ->     operator_factory<average_tag> {
435     return operator_factory<average_tag>(std::tuple<>{});
436 }
437 
438 /*! \brief For each item from this observable reduce it by adding to the previous items.
439 
440     \return  An observable that emits a single item: the sum of elements emitted by the source observable.
441 
442     \sample
443     \snippet math.cpp sum sample
444     \snippet output.txt sum sample
445 
446     When the source observable completes without emitting any items:
447     \snippet math.cpp sum empty sample
448     \snippet output.txt sum empty sample
449 
450     When the source observable calls on_error:
451     \snippet math.cpp sum error sample
452     \snippet output.txt sum error sample
453 */
sum()454 inline auto sum()
455     ->     operator_factory<sum_tag> {
456     return operator_factory<sum_tag>(std::tuple<>{});
457 }
458 
459 /*! \brief For each item from this observable reduce it by taking the min value of the previous items.
460 
461     \return  An observable that emits a single item: the min of elements emitted by the source observable.
462 
463     \sample
464     \snippet math.cpp min sample
465     \snippet output.txt min sample
466 
467     When the source observable completes without emitting any items:
468     \snippet math.cpp min empty sample
469     \snippet output.txt min empty sample
470 
471     When the source observable calls on_error:
472     \snippet math.cpp min error sample
473     \snippet output.txt min error sample
474 */
min()475 inline auto min()
476     ->     operator_factory<min_tag> {
477     return operator_factory<min_tag>(std::tuple<>{});
478 }
479 
480 /*! \brief For each item from this observable reduce it by taking the max value of the previous items.
481 
482     \return  An observable that emits a single item: the max of elements emitted by the source observable.
483 
484     \sample
485     \snippet math.cpp max sample
486     \snippet output.txt max sample
487 
488     When the source observable completes without emitting any items:
489     \snippet math.cpp max empty sample
490     \snippet output.txt max empty sample
491 
492     When the source observable calls on_error:
493     \snippet math.cpp max error sample
494     \snippet output.txt max error sample
495 */
max()496 inline auto max()
497     ->     operator_factory<max_tag> {
498     return operator_factory<max_tag>(std::tuple<>{});
499 }
500 
501 }
502 
503 template<>
504 struct member_overload<reduce_tag>
505 {
506 
507     template<class Observable, class Seed, class Accumulator, class ResultSelector,
508         class Reduce = rxo::detail::reduce<rxu::value_type_t<Observable>, rxu::decay_t<Observable>, rxu::decay_t<Accumulator>, rxu::decay_t<ResultSelector>, rxu::decay_t<Seed>>,
509         class Value = rxu::value_type_t<Reduce>,
510         class Result = observable<Value, Reduce>>
memberrxcpp::member_overload511     static Result member(Observable&& o, Seed&& s, Accumulator&& a, ResultSelector&& r)
512     {
513         return Result(Reduce(std::forward<Observable>(o), std::forward<Accumulator>(a), std::forward<ResultSelector>(r), std::forward<Seed>(s)));
514     }
515 
516     template<class Observable, class Seed, class Accumulator,
517         class ResultSelector=rxu::detail::take_at<0>,
518         class Reduce = rxo::detail::reduce<rxu::value_type_t<Observable>, rxu::decay_t<Observable>, rxu::decay_t<Accumulator>, rxu::decay_t<ResultSelector>, rxu::decay_t<Seed>>,
519         class Value = rxu::value_type_t<Reduce>,
520         class Result = observable<Value, Reduce>>
memberrxcpp::member_overload521     static Result member(Observable&& o, Seed&& s, Accumulator&& a)
522     {
523         return Result(Reduce(std::forward<Observable>(o), std::forward<Accumulator>(a), rxu::detail::take_at<0>(), std::forward<Seed>(s)));
524     }
525 
526     template<class... AN>
memberrxcpp::member_overload527     static operators::detail::reduce_invalid_t<AN...> member(AN...) {
528         std::terminate();
529         return {};
530         static_assert(sizeof...(AN) == 10000, "reduce takes (Seed, Accumulator, optional ResultSelector), Accumulator takes (Seed, Observable::value_type) -> Seed, ResultSelector takes (Observable::value_type) -> ResultValue");
531     }
532 };
533 
534 template<>
535 struct member_overload<first_tag>
536 {
537     template<class Observable,
538         class SValue = rxu::value_type_t<Observable>,
539         class Operation = operators::detail::first<SValue>,
540         class Seed = decltype(Operation::seed()),
541         class Accumulator = Operation,
542         class ResultSelector = Operation,
543         class TakeOne = decltype(((rxu::decay_t<Observable>*)nullptr)->take(1)),
544         class Reduce = rxo::detail::reduce<SValue, rxu::decay_t<TakeOne>, rxu::decay_t<Accumulator>, rxu::decay_t<ResultSelector>, rxu::decay_t<Seed>>,
545         class RValue = rxu::value_type_t<Reduce>,
546         class Result = observable<RValue, Reduce>>
memberrxcpp::member_overload547     static Result member(Observable&& o)
548     {
549         return Result(Reduce(o.take(1), Operation{}, Operation{}, Operation::seed()));
550     }
551 
552     template<class... AN>
memberrxcpp::member_overload553     static operators::detail::reduce_invalid_t<AN...> member(AN...) {
554         std::terminate();
555         return {};
556         static_assert(sizeof...(AN) == 10000, "first does not support Observable::value_type");
557     }
558 };
559 
560 template<>
561 struct member_overload<last_tag>
562 {
563     template<class Observable,
564         class SValue = rxu::value_type_t<Observable>,
565         class Operation = operators::detail::last<SValue>,
566         class Seed = decltype(Operation::seed()),
567         class Accumulator = Operation,
568         class ResultSelector = Operation,
569         class Reduce = rxo::detail::reduce<SValue, rxu::decay_t<Observable>, rxu::decay_t<Accumulator>, rxu::decay_t<ResultSelector>, rxu::decay_t<Seed>>,
570         class RValue = rxu::value_type_t<Reduce>,
571         class Result = observable<RValue, Reduce>>
memberrxcpp::member_overload572     static Result member(Observable&& o)
573     {
574         return Result(Reduce(std::forward<Observable>(o), Operation{}, Operation{}, Operation::seed()));
575     }
576 
577     template<class... AN>
memberrxcpp::member_overload578     static operators::detail::reduce_invalid_t<AN...> member(AN...) {
579         std::terminate();
580         return {};
581         static_assert(sizeof...(AN) == 10000, "last does not support Observable::value_type");
582     }
583 };
584 
585 template<>
586 struct member_overload<sum_tag>
587 {
588     template<class Observable,
589         class SValue = rxu::value_type_t<Observable>,
590         class Operation = operators::detail::sum<SValue>,
591         class Seed = decltype(Operation::seed()),
592         class Accumulator = Operation,
593         class ResultSelector = Operation,
594         class Reduce = rxo::detail::reduce<SValue, rxu::decay_t<Observable>, rxu::decay_t<Accumulator>, rxu::decay_t<ResultSelector>, rxu::decay_t<Seed>>,
595         class RValue = rxu::value_type_t<Reduce>,
596         class Result = observable<RValue, Reduce>>
memberrxcpp::member_overload597     static Result member(Observable&& o)
598     {
599         return Result(Reduce(std::forward<Observable>(o), Operation{}, Operation{}, Operation::seed()));
600     }
601 
602     template<class... AN>
memberrxcpp::member_overload603     static operators::detail::reduce_invalid_t<AN...> member(AN...) {
604         std::terminate();
605         return {};
606         static_assert(sizeof...(AN) == 10000, "sum does not support Observable::value_type");
607     }
608 };
609 
610 template<>
611 struct member_overload<average_tag>
612 {
613     template<class Observable,
614         class SValue = rxu::value_type_t<Observable>,
615         class Operation = operators::detail::average<SValue>,
616         class Seed = decltype(Operation::seed()),
617         class Accumulator = Operation,
618         class ResultSelector = Operation,
619         class Reduce = rxo::detail::reduce<SValue, rxu::decay_t<Observable>, rxu::decay_t<Accumulator>, rxu::decay_t<ResultSelector>, rxu::decay_t<Seed>>,
620         class RValue = rxu::value_type_t<Reduce>,
621         class Result = observable<RValue, Reduce>>
memberrxcpp::member_overload622     static Result member(Observable&& o)
623     {
624         return Result(Reduce(std::forward<Observable>(o), Operation{}, Operation{}, Operation::seed()));
625     }
626 
627     template<class... AN>
memberrxcpp::member_overload628     static operators::detail::reduce_invalid_t<AN...> member(AN...) {
629         std::terminate();
630         return {};
631         static_assert(sizeof...(AN) == 10000, "average does not support Observable::value_type");
632     }
633 };
634 
635 template<>
636 struct member_overload<max_tag>
637 {
638     template<class Observable,
639         class SValue = rxu::value_type_t<Observable>,
640         class Operation = operators::detail::max<SValue>,
641         class Seed = decltype(Operation::seed()),
642         class Accumulator = Operation,
643         class ResultSelector = Operation,
644         class Reduce = rxo::detail::reduce<SValue, rxu::decay_t<Observable>, rxu::decay_t<Accumulator>, rxu::decay_t<ResultSelector>, rxu::decay_t<Seed>>,
645         class RValue = rxu::value_type_t<Reduce>,
646         class Result = observable<RValue, Reduce>>
memberrxcpp::member_overload647     static Result member(Observable&& o)
648     {
649         return Result(Reduce(std::forward<Observable>(o), Operation{}, Operation{}, Operation::seed()));
650     }
651 
652     template<class... AN>
memberrxcpp::member_overload653     static operators::detail::reduce_invalid_t<AN...> member(AN...) {
654         std::terminate();
655         return {};
656         static_assert(sizeof...(AN) == 10000, "max does not support Observable::value_type");
657     }
658 };
659 
660 template<>
661 struct member_overload<min_tag>
662 {
663     template<class Observable,
664         class SValue = rxu::value_type_t<Observable>,
665         class Operation = operators::detail::min<SValue>,
666         class Seed = decltype(Operation::seed()),
667         class Accumulator = Operation,
668         class ResultSelector = Operation,
669         class Reduce = rxo::detail::reduce<SValue, rxu::decay_t<Observable>, rxu::decay_t<Accumulator>, rxu::decay_t<ResultSelector>, rxu::decay_t<Seed>>,
670         class RValue = rxu::value_type_t<Reduce>,
671         class Result = observable<RValue, Reduce>>
memberrxcpp::member_overload672     static Result member(Observable&& o)
673     {
674         return Result(Reduce(std::forward<Observable>(o), Operation{}, Operation{}, Operation::seed()));
675     }
676 
677     template<class... AN>
memberrxcpp::member_overload678     static operators::detail::reduce_invalid_t<AN...> member(AN...) {
679         std::terminate();
680         return {};
681         static_assert(sizeof...(AN) == 10000, "min does not support Observable::value_type");
682     }
683 };
684 
685 }
686 
687 #endif
688