1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_RX_BEHAVIOR_HPP)
6 #define RXCPP_RX_BEHAVIOR_HPP
7 
8 #include "../rx-includes.hpp"
9 
10 namespace rxcpp {
11 
12 namespace subjects {
13 
14 namespace detail {
15 
16 template<class T>
17 class behavior_observer : public detail::multicast_observer<T>
18 {
19     typedef behavior_observer<T> this_type;
20     typedef detail::multicast_observer<T> base_type;
21 
22     class behavior_observer_state : public std::enable_shared_from_this<behavior_observer_state>
23     {
24         mutable std::mutex lock;
25         mutable T value;
26 
27     public:
behavior_observer_state(T first)28         behavior_observer_state(T first)
29             : value(first)
30         {
31         }
32 
reset(T v) const33         void reset(T v) const {
34             std::unique_lock<std::mutex> guard(lock);
35             value = std::move(v);
36         }
get() const37         T get() const {
38             std::unique_lock<std::mutex> guard(lock);
39             return value;
40         }
41     };
42 
43     std::shared_ptr<behavior_observer_state> state;
44 
45 public:
behavior_observer(T f,composite_subscription l)46     behavior_observer(T f, composite_subscription l)
47         : base_type(l)
48         , state(std::make_shared<behavior_observer_state>(std::move(f)))
49     {
50     }
51 
get_subscriber() const52     subscriber<T> get_subscriber() const {
53         return make_subscriber<T>(this->get_id(), this->get_subscription(), observer<T, detail::behavior_observer<T>>(*this)).as_dynamic();
54     }
55 
get_value() const56     T get_value() const {
57         return state->get();
58     }
59 
60     template<class V>
on_next(V v) const61     void on_next(V v) const {
62         state->reset(v);
63         base_type::on_next(std::move(v));
64     }
65 };
66 
67 }
68 
69 template<class T>
70 class behavior
71 {
72     detail::behavior_observer<T> s;
73 
74 public:
behavior(T f,composite_subscription cs=composite_subscription ())75     explicit behavior(T f, composite_subscription cs = composite_subscription())
76         : s(std::move(f), cs)
77     {
78     }
79 
has_observers() const80     bool has_observers() const {
81         return s.has_observers();
82     }
83 
get_value() const84     T get_value() const {
85         return s.get_value();
86     }
87 
get_subscriber() const88     subscriber<T> get_subscriber() const {
89         return s.get_subscriber();
90     }
91 
get_observable() const92     observable<T> get_observable() const {
93         auto keepAlive = s;
94         return make_observable_dynamic<T>([=](subscriber<T> o){
95             if (keepAlive.get_subscription().is_subscribed()) {
96                 o.on_next(get_value());
97             }
98             keepAlive.add(s.get_subscriber(), std::move(o));
99         });
100     }
101 };
102 
103 }
104 
105 }
106 
107 #endif
108