1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_RX_SUBSCRIPTION_HPP)
6 #define RXCPP_RX_SUBSCRIPTION_HPP
7 
8 #include "rx-includes.hpp"
9 
10 namespace rxcpp {
11 
12 namespace detail {
13 
14 template<class F>
15 struct is_unsubscribe_function
16 {
17     struct not_void {};
18     template<class CF>
19     static auto check(int) -> decltype((*(CF*)nullptr)());
20     template<class CF>
21     static not_void check(...);
22 
23     static const bool value = std::is_same<decltype(check<rxu::decay_t<F>>(0)), void>::value;
24 };
25 
26 }
27 
28 struct tag_subscription {};
29 struct subscription_base {typedef tag_subscription subscription_tag;};
30 template<class T>
31 class is_subscription
32 {
33     template<class C>
34     static typename C::subscription_tag* check(int);
35     template<class C>
36     static void check(...);
37 public:
38     static const bool value = std::is_convertible<decltype(check<rxu::decay_t<T>>(0)), tag_subscription*>::value;
39 };
40 
41 template<class Unsubscribe>
42 class static_subscription
43 {
44     typedef rxu::decay_t<Unsubscribe> unsubscribe_call_type;
45     unsubscribe_call_type unsubscribe_call;
static_subscription()46     static_subscription()
47     {
48     }
49 public:
static_subscription(const static_subscription & o)50     static_subscription(const static_subscription& o)
51         : unsubscribe_call(o.unsubscribe_call)
52     {
53     }
static_subscription(static_subscription && o)54     static_subscription(static_subscription&& o)
55         : unsubscribe_call(std::move(o.unsubscribe_call))
56     {
57     }
static_subscription(unsubscribe_call_type s)58     static_subscription(unsubscribe_call_type s)
59         : unsubscribe_call(std::move(s))
60     {
61     }
unsubscribe() const62     void unsubscribe() const {
63         unsubscribe_call();
64     }
65 };
66 
67 class subscription : public subscription_base
68 {
69     class base_subscription_state : public std::enable_shared_from_this<base_subscription_state>
70     {
71         base_subscription_state();
72     public:
73 
base_subscription_state(bool initial)74         explicit base_subscription_state(bool initial)
75             : issubscribed(initial)
76         {
77         }
~base_subscription_state()78         virtual ~base_subscription_state() {}
unsubscribe()79         virtual void unsubscribe() {
80         }
81         std::atomic<bool> issubscribed;
82     };
83 public:
84     typedef std::weak_ptr<base_subscription_state> weak_state_type;
85 
86 private:
87     template<class I>
88     struct subscription_state : public base_subscription_state
89     {
90         typedef rxu::decay_t<I> inner_t;
subscription_staterxcpp::subscription::subscription_state91         subscription_state(inner_t i)
92             : base_subscription_state(true)
93             , inner(std::move(i))
94         {
95         }
unsubscriberxcpp::subscription::subscription_state96         virtual void unsubscribe() {
97             if (issubscribed.exchange(false)) {
98                 trace_activity().unsubscribe_enter(*this);
99                 inner.unsubscribe();
100                 trace_activity().unsubscribe_return(*this);
101             }
102         }
103         inner_t inner;
104     };
105 
106 protected:
107     std::shared_ptr<base_subscription_state> state;
108 
109     friend bool operator<(const subscription&, const subscription&);
110     friend bool operator==(const subscription&, const subscription&);
111 
112 private:
subscription(weak_state_type w)113     subscription(weak_state_type w)
114         : state(w.lock())
115     {
116         if (!state) {
117             std::terminate();
118         }
119     }
120 
subscription(std::shared_ptr<base_subscription_state> s)121     explicit subscription(std::shared_ptr<base_subscription_state> s)
122         : state(std::move(s))
123     {
124         if (!state) {
125             std::terminate();
126         }
127     }
128 public:
129 
subscription()130     subscription()
131         : state(std::make_shared<base_subscription_state>(false))
132     {
133         if (!state) {
134             std::terminate();
135         }
136     }
137     template<class U>
subscription(U u,typename std::enable_if<!is_subscription<U>::value,void ** >::type=nullptr)138     explicit subscription(U u, typename std::enable_if<!is_subscription<U>::value, void**>::type = nullptr)
139         : state(std::make_shared<subscription_state<U>>(std::move(u)))
140     {
141         if (!state) {
142             std::terminate();
143         }
144     }
145     template<class U>
subscription(U u,typename std::enable_if<!std::is_same<subscription,U>::value && is_subscription<U>::value,void ** >::type=nullptr)146     explicit subscription(U u, typename std::enable_if<!std::is_same<subscription, U>::value && is_subscription<U>::value, void**>::type = nullptr)
147         // intentionally slice
148         : state(std::move((*static_cast<subscription*>(&u)).state))
149     {
150         if (!state) {
151             std::terminate();
152         }
153     }
subscription(const subscription & o)154     subscription(const subscription& o)
155         : state(o.state)
156     {
157         if (!state) {
158             std::terminate();
159         }
160     }
subscription(subscription && o)161     subscription(subscription&& o)
162         : state(std::move(o.state))
163     {
164         if (!state) {
165             std::terminate();
166         }
167     }
operator =(subscription o)168     subscription& operator=(subscription o) {
169         state = std::move(o.state);
170         return *this;
171     }
is_subscribed() const172     bool is_subscribed() const {
173         if (!state) {
174             std::terminate();
175         }
176         return state->issubscribed;
177     }
unsubscribe() const178     void unsubscribe() const {
179         if (!state) {
180             std::terminate();
181         }
182         auto keepAlive = state;
183         state->unsubscribe();
184     }
185 
get_weak()186     weak_state_type get_weak() {
187         return state;
188     }
189 
190     // Atomically promote weak subscription to strong.
191     // Calls std::terminate if w has already expired.
lock(weak_state_type w)192     static subscription lock(weak_state_type w) {
193         return subscription(w);
194     }
195 
196     // Atomically try to promote weak subscription to strong.
197     // Returns an empty maybe<> if w has already expired.
maybe_lock(weak_state_type w)198     static rxu::maybe<subscription> maybe_lock(weak_state_type w) {
199         auto strong_subscription = w.lock();
200         if (!strong_subscription) {
201             return rxu::detail::maybe<subscription>{};
202         } else {
203             return rxu::detail::maybe<subscription>{subscription{std::move(strong_subscription)}};
204         }
205     }
206 };
207 
operator <(const subscription & lhs,const subscription & rhs)208 inline bool operator<(const subscription& lhs, const subscription& rhs) {
209     return lhs.state < rhs.state;
210 }
operator ==(const subscription & lhs,const subscription & rhs)211 inline bool operator==(const subscription& lhs, const subscription& rhs) {
212     return lhs.state == rhs.state;
213 }
operator !=(const subscription & lhs,const subscription & rhs)214 inline bool operator!=(const subscription& lhs, const subscription& rhs) {
215     return !(lhs == rhs);
216 }
217 
218 
make_subscription()219 inline auto make_subscription()
220     ->      subscription {
221     return  subscription();
222 }
223 template<class I>
make_subscription(I && i)224 auto make_subscription(I&& i)
225     -> typename std::enable_if<!is_subscription<I>::value && !detail::is_unsubscribe_function<I>::value,
226             subscription>::type {
227     return  subscription(std::forward<I>(i));
228 }
229 template<class Unsubscribe>
make_subscription(Unsubscribe && u)230 auto make_subscription(Unsubscribe&& u)
231     -> typename std::enable_if<detail::is_unsubscribe_function<Unsubscribe>::value,
232             subscription>::type {
233     return  subscription(static_subscription<Unsubscribe>(std::forward<Unsubscribe>(u)));
234 }
235 
236 class composite_subscription;
237 
238 namespace detail {
239 
240 struct tag_composite_subscription_empty {};
241 
242 class composite_subscription_inner
243 {
244 private:
245     typedef subscription::weak_state_type weak_subscription;
246     struct composite_subscription_state : public std::enable_shared_from_this<composite_subscription_state>
247     {
248         // invariant: cannot access this data without the lock held.
249         std::set<subscription> subscriptions;
250         // double checked locking:
251         //    issubscribed must be loaded again after each lock acquisition.
252         // invariant:
253         //    never call subscription::unsubscribe with lock held.
254         std::mutex lock;
255         // invariant: transitions from 'true' to 'false' exactly once, at any time.
256         std::atomic<bool> issubscribed;
257 
~composite_subscription_staterxcpp::detail::composite_subscription_inner::composite_subscription_state258         ~composite_subscription_state()
259         {
260             std::unique_lock<decltype(lock)> guard(lock);
261             subscriptions.clear();
262         }
263 
composite_subscription_staterxcpp::detail::composite_subscription_inner::composite_subscription_state264         composite_subscription_state()
265             : issubscribed(true)
266         {
267         }
composite_subscription_staterxcpp::detail::composite_subscription_inner::composite_subscription_state268         composite_subscription_state(tag_composite_subscription_empty)
269             : issubscribed(false)
270         {
271         }
272 
273         // Atomically add 's' to the set of subscriptions.
274         //
275         // If unsubscribe() has already occurred, this immediately
276         // calls s.unsubscribe().
277         //
278         // cs.unsubscribe() [must] happens-before s.unsubscribe()
279         //
280         // Due to the un-atomic nature of calling 's.unsubscribe()',
281         // it is possible to observe the unintuitive
282         // add(s)=>s.unsubscribe() prior
283         // to any of the unsubscribe()=>sN.unsubscribe().
addrxcpp::detail::composite_subscription_inner::composite_subscription_state284         inline weak_subscription add(subscription s) {
285             if (!issubscribed) {  // load.acq [seq_cst]
286                 s.unsubscribe();
287             } else if (s.is_subscribed()) {
288                 std::unique_lock<decltype(lock)> guard(lock);
289                 if (!issubscribed) {  // load.acq [seq_cst]
290                     // unsubscribe was called concurrently.
291                     guard.unlock();
292                     // invariant: do not call unsubscribe with lock held.
293                     s.unsubscribe();
294                 } else {
295                     subscriptions.insert(s);
296                 }
297             }
298             return s.get_weak();
299         }
300 
301         // Atomically remove 'w' from the set of subscriptions.
302         //
303         // This does nothing if 'w' was already previously removed,
304         // or refers to an expired value.
removerxcpp::detail::composite_subscription_inner::composite_subscription_state305         inline void remove(weak_subscription w) {
306             if (issubscribed) { // load.acq [seq_cst]
307                 rxu::maybe<subscription> maybe_subscription = subscription::maybe_lock(w);
308 
309                 if (maybe_subscription.empty()) {
310                   // Do nothing if the subscription has already expired.
311                   return;
312                 }
313 
314                 std::unique_lock<decltype(lock)> guard(lock);
315                 // invariant: subscriptions must be accessed under the lock.
316 
317                 if (issubscribed) { // load.acq [seq_cst]
318                   subscription& s = maybe_subscription.get();
319                   subscriptions.erase(std::move(s));
320                 } // else unsubscribe() was called concurrently; this becomes a no-op.
321             }
322         }
323 
324         // Atomically clear all subscriptions that were observably added
325         // (and not subsequently observably removed).
326         //
327         // Un-atomically call unsubscribe on those subscriptions.
328         //
329         // forall subscriptions in {add(s1),add(s2),...}
330         //                         - {remove(s3), remove(s4), ...}:
331         //   cs.unsubscribe() || cs.clear() happens before s.unsubscribe()
332         //
333         // cs.unsubscribe() observed-before cs.clear ==> do nothing.
clearrxcpp::detail::composite_subscription_inner::composite_subscription_state334         inline void clear() {
335             if (issubscribed) { // load.acq [seq_cst]
336                 std::unique_lock<decltype(lock)> guard(lock);
337 
338                 if (!issubscribed) { // load.acq [seq_cst]
339                   // unsubscribe was called concurrently.
340                   return;
341                 }
342 
343                 std::set<subscription> v(std::move(subscriptions));
344                 // invariant: do not call unsubscribe with lock held.
345                 guard.unlock();
346                 std::for_each(v.begin(), v.end(),
347                               [](const subscription& s) {
348                                 s.unsubscribe(); });
349             }
350         }
351 
352         // Atomically clear all subscriptions that were observably added
353         // (and not subsequently observably removed).
354         //
355         // Un-atomically call unsubscribe on those subscriptions.
356         //
357         // Switches to an 'unsubscribed' state, all subsequent
358         // adds are immediately unsubscribed.
359         //
360         // cs.unsubscribe() [must] happens-before
361         //     cs.add(s) ==> s.unsubscribe()
362         //
363         // forall subscriptions in {add(s1),add(s2),...}
364         //                         - {remove(s3), remove(s4), ...}:
365         //   cs.unsubscribe() || cs.clear() happens before s.unsubscribe()
unsubscriberxcpp::detail::composite_subscription_inner::composite_subscription_state366         inline void unsubscribe() {
367             if (issubscribed.exchange(false)) {  // cas.acq_rel [seq_cst]
368                 std::unique_lock<decltype(lock)> guard(lock);
369 
370                 // is_subscribed can only transition to 'false' once,
371                 // does not need an extra atomic access here.
372 
373                 std::set<subscription> v(std::move(subscriptions));
374                 // invariant: do not call unsubscribe with lock held.
375                 guard.unlock();
376                 std::for_each(v.begin(), v.end(),
377                               [](const subscription& s) {
378                                 s.unsubscribe(); });
379             }
380         }
381     };
382 
383 public:
384     typedef std::shared_ptr<composite_subscription_state> shared_state_type;
385 
386 protected:
387     mutable shared_state_type state;
388 
389 public:
composite_subscription_inner()390     composite_subscription_inner()
391         : state(std::make_shared<composite_subscription_state>())
392     {
393     }
composite_subscription_inner(tag_composite_subscription_empty et)394     composite_subscription_inner(tag_composite_subscription_empty et)
395         : state(std::make_shared<composite_subscription_state>(et))
396     {
397     }
398 
composite_subscription_inner(const composite_subscription_inner & o)399     composite_subscription_inner(const composite_subscription_inner& o)
400         : state(o.state)
401     {
402         if (!state) {
403             std::terminate();
404         }
405     }
composite_subscription_inner(composite_subscription_inner && o)406     composite_subscription_inner(composite_subscription_inner&& o)
407         : state(std::move(o.state))
408     {
409         if (!state) {
410             std::terminate();
411         }
412     }
413 
operator =(composite_subscription_inner o)414     composite_subscription_inner& operator=(composite_subscription_inner o)
415     {
416         state = std::move(o.state);
417         if (!state) {
418             std::terminate();
419         }
420         return *this;
421     }
422 
add(subscription s) const423     inline weak_subscription add(subscription s) const {
424         if (!state) {
425             std::terminate();
426         }
427         return state->add(std::move(s));
428     }
remove(weak_subscription w) const429     inline void remove(weak_subscription w) const {
430         if (!state) {
431             std::terminate();
432         }
433         state->remove(std::move(w));
434     }
clear() const435     inline void clear() const {
436         if (!state) {
437             std::terminate();
438         }
439         state->clear();
440     }
unsubscribe()441     inline void unsubscribe() {
442         if (!state) {
443             std::terminate();
444         }
445         state->unsubscribe();
446     }
447 };
448 
449 inline composite_subscription shared_empty();
450 
451 }
452 
453 /*!
454     \brief controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
455 
456     \ingroup group-core
457 
458 */
459 class composite_subscription
460     : protected detail::composite_subscription_inner
461     , public subscription
462 {
463     typedef detail::composite_subscription_inner inner_type;
464 public:
465     typedef subscription::weak_state_type weak_subscription;
466 
composite_subscription(detail::tag_composite_subscription_empty et)467     composite_subscription(detail::tag_composite_subscription_empty et)
468         : inner_type(et)
469         , subscription() // use empty base
470     {
471     }
472 
473 public:
474 
composite_subscription()475     composite_subscription()
476         : inner_type()
477         , subscription(*static_cast<const inner_type*>(this))
478     {
479     }
480 
composite_subscription(const composite_subscription & o)481     composite_subscription(const composite_subscription& o)
482         : inner_type(o)
483         , subscription(static_cast<const subscription&>(o))
484     {
485     }
composite_subscription(composite_subscription && o)486     composite_subscription(composite_subscription&& o)
487         : inner_type(std::move(o))
488         , subscription(std::move(static_cast<subscription&>(o)))
489     {
490     }
491 
operator =(composite_subscription o)492     composite_subscription& operator=(composite_subscription o)
493     {
494         inner_type::operator=(std::move(o));
495         subscription::operator=(std::move(*static_cast<subscription*>(&o)));
496         return *this;
497     }
498 
empty()499     static inline composite_subscription empty() {
500         return detail::shared_empty();
501     }
502 
503     using subscription::is_subscribed;
504     using subscription::unsubscribe;
505 
506     using inner_type::clear;
507 
add(subscription s) const508     inline weak_subscription add(subscription s) const {
509         if (s == static_cast<const subscription&>(*this)) {
510             // do not nest the same subscription
511             std::terminate();
512             //return s.get_weak();
513         }
514         auto that = this->subscription::state.get();
515         trace_activity().subscription_add_enter(*that, s);
516         auto w = inner_type::add(std::move(s));
517         trace_activity().subscription_add_return(*that);
518         return w;
519     }
520 
521     template<class F>
add(F f) const522     auto add(F f) const
523     -> typename std::enable_if<detail::is_unsubscribe_function<F>::value, weak_subscription>::type {
524         return add(make_subscription(std::move(f)));
525     }
526 
remove(weak_subscription w) const527     inline void remove(weak_subscription w) const {
528         auto that = this->subscription::state.get();
529         trace_activity().subscription_remove_enter(*that, w);
530         inner_type::remove(w);
531         trace_activity().subscription_remove_return(*that);
532     }
533 };
534 
operator <(const composite_subscription & lhs,const composite_subscription & rhs)535 inline bool operator<(const composite_subscription& lhs, const composite_subscription& rhs) {
536     return static_cast<const subscription&>(lhs) < static_cast<const subscription&>(rhs);
537 }
operator ==(const composite_subscription & lhs,const composite_subscription & rhs)538 inline bool operator==(const composite_subscription& lhs, const composite_subscription& rhs) {
539     return static_cast<const subscription&>(lhs) == static_cast<const subscription&>(rhs);
540 }
operator !=(const composite_subscription & lhs,const composite_subscription & rhs)541 inline bool operator!=(const composite_subscription& lhs, const composite_subscription& rhs) {
542     return !(lhs == rhs);
543 }
544 
545 namespace detail {
546 
shared_empty()547 inline composite_subscription shared_empty() {
548     static composite_subscription shared_empty = composite_subscription(tag_composite_subscription_empty());
549     return shared_empty;
550 }
551 
552 }
553 
554 template<class T>
555 class resource : public subscription_base
556 {
557 public:
558     typedef typename composite_subscription::weak_subscription weak_subscription;
559 
resource()560     resource()
561         : lifetime(composite_subscription())
562         , value(std::make_shared<rxu::detail::maybe<T>>())
563     {
564     }
565 
resource(T t,composite_subscription cs=composite_subscription ())566     explicit resource(T t, composite_subscription cs = composite_subscription())
567         : lifetime(std::move(cs))
568         , value(std::make_shared<rxu::detail::maybe<T>>(rxu::detail::maybe<T>(std::move(t))))
569     {
570         auto localValue = value;
571         lifetime.add(
572             [localValue](){
573                 localValue->reset();
574             }
575         );
576     }
577 
get()578     T& get() {
579         return value.get()->get();
580     }
get_subscription()581     composite_subscription& get_subscription() {
582         return lifetime;
583     }
584 
is_subscribed() const585     bool is_subscribed() const {
586         return lifetime.is_subscribed();
587     }
add(subscription s) const588     weak_subscription add(subscription s) const {
589         return lifetime.add(std::move(s));
590     }
591     template<class F>
add(F f) const592     auto add(F f) const
593     -> typename std::enable_if<detail::is_unsubscribe_function<F>::value, weak_subscription>::type {
594         return lifetime.add(make_subscription(std::move(f)));
595     }
remove(weak_subscription w) const596     void remove(weak_subscription w) const {
597         return lifetime.remove(std::move(w));
598     }
clear() const599     void clear() const {
600         return lifetime.clear();
601     }
unsubscribe() const602     void unsubscribe() const {
603         return lifetime.unsubscribe();
604     }
605 
606 protected:
607     composite_subscription lifetime;
608     std::shared_ptr<rxu::detail::maybe<T>> value;
609 };
610 
611 }
612 
613 #endif
614