1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 /*! \file rx-element_at.hpp
6 
7     \brief Pulls an item located at a specified index location in the sequence of items and emits that item as its own sole emission.
8 
9     \param index  the index of the element to return.
10 
11     \return An observable that emit an item located at a specified index location.
12 
13     \sample
14     \snippet element_at.cpp element_at sample
15     \snippet output.txt element_at sample
16 */
17 
18 #if !defined(RXCPP_OPERATORS_RX_ELEMENT_AT_HPP)
19 #define RXCPP_OPERATORS_RX_ELEMENT_AT_HPP
20 
21 #include "../rx-includes.hpp"
22 
23 namespace rxcpp {
24 
25 namespace operators {
26 
27 namespace detail {
28 
29 template<class... AN>
30 struct element_at_invalid_arguments {};
31 
32 template<class... AN>
33 struct element_at_invalid : public rxo::operator_base<element_at_invalid_arguments<AN...>> {
34     using type = observable<element_at_invalid_arguments<AN...>, element_at_invalid<AN...>>;
35 };
36 template<class... AN>
37 using element_at_invalid_t = typename element_at_invalid<AN...>::type;
38 
39 template<class T>
40 struct element_at {
41     typedef rxu::decay_t<T> source_value_type;
42 
43     struct element_at_values {
element_at_valuesrxcpp::operators::detail::element_at::element_at_values44         element_at_values(int i)
45             : index(i)
46         {
47         }
48         int index;
49     };
50 
51     element_at_values initial;
52 
element_atrxcpp::operators::detail::element_at53     element_at(int i)
54         : initial(i)
55     {
56     }
57 
58     template<class Subscriber>
59     struct element_at_observer : public element_at_values
60     {
61         typedef element_at_observer<Subscriber> this_type;
62         typedef source_value_type value_type;
63         typedef rxu::decay_t<Subscriber> dest_type;
64         typedef observer<value_type, this_type> observer_type;
65         dest_type dest;
66         mutable int current;
67 
element_at_observerrxcpp::operators::detail::element_at::element_at_observer68         element_at_observer(dest_type d, element_at_values v)
69             : element_at_values(v),
70               dest(d),
71               current(0)
72         {
73         }
on_nextrxcpp::operators::detail::element_at::element_at_observer74         void on_next(source_value_type v) const {
75             if (current++ == this->index) {
76                 dest.on_next(v);
77                 dest.on_completed();
78             }
79         }
on_errorrxcpp::operators::detail::element_at::element_at_observer80         void on_error(rxu::error_ptr e) const {
81             dest.on_error(e);
82         }
on_completedrxcpp::operators::detail::element_at::element_at_observer83         void on_completed() const {
84             if(current <= this->index) {
85                 dest.on_error(rxu::make_error_ptr(std::range_error("index is out of bounds")));
86             }
87         }
88 
makerxcpp::operators::detail::element_at::element_at_observer89         static subscriber<value_type, observer_type> make(dest_type d, element_at_values v) {
90             return make_subscriber<value_type>(d, this_type(d, v));
91         }
92     };
93 
94     template<class Subscriber>
operator ()rxcpp::operators::detail::element_at95     auto operator()(Subscriber dest) const
96         -> decltype(element_at_observer<Subscriber>::make(std::move(dest), initial)) {
97         return      element_at_observer<Subscriber>::make(std::move(dest), initial);
98     }
99 };
100 
101 }
102 
103 /*! @copydoc rx-element_at.hpp
104 */
105 template<class... AN>
element_at(AN &&...an)106 auto element_at(AN&&... an)
107     ->      operator_factory<element_at_tag, AN...> {
108      return operator_factory<element_at_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
109 }
110 
111 }
112 
113 template<>
114 struct member_overload<element_at_tag>
115 {
116     template<class Observable,
117         class Enabled = rxu::enable_if_all_true_type_t<
118             is_observable<Observable>
119             >,
120         class SourceValue = rxu::value_type_t<Observable>,
121         class element_at = rxo::detail::element_at<SourceValue>>
memberrxcpp::member_overload122     static auto member(Observable&& o, int index)
123         -> decltype(o.template lift<SourceValue>(element_at(index))) {
124         return      o.template lift<SourceValue>(element_at(index));
125     }
126 
127     template<class... AN>
memberrxcpp::member_overload128     static operators::detail::element_at_invalid_t<AN...> member(const AN...) {
129         std::terminate();
130         return {};
131         static_assert(sizeof...(AN) == 10000, "element_at takes (required int)");
132     }
133 };
134 
135 }
136 
137 #endif
138