1 // Copyright (C) 2023 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 //! This module provides [BufferSubscriber] implementations and helpers.
16 
17 use std::sync::{Arc, Mutex};
18 
19 use crate::*;
20 
21 /// A [BufferSubscriber] wrapper that provides shared access.
22 ///
23 /// Normally, [BufferSubscriber]s are fully owned by the publisher that they are attached to. With
24 /// [SharedSubscriber], a
25 ///
26 /// # Panics
27 ///
28 /// [BufferSubscriber::on_subscribe] on a [SharedSubscriber] can only be called once, otherwise it
29 /// will panic. This is to prevent accidental and unsupported sharing between multiple publishers to
30 /// reflect the usual behavior where a publisher takes full ownership of a subscriber.
31 pub struct SharedSubscriber<S: BufferSubscriber>(Arc<Mutex<SharedSubscriberInner<S>>>);
32 
33 struct SharedSubscriberInner<S: BufferSubscriber> {
34     subscriber: S,
35     is_subscribed: bool,
36 }
37 
38 impl<S: BufferSubscriber> SharedSubscriber<S> {
39     /// Create a new wrapper around a [BufferSubscriber].
new(subscriber: S) -> Self40     pub fn new(subscriber: S) -> Self {
41         Self(Arc::new(Mutex::new(SharedSubscriberInner { subscriber, is_subscribed: false })))
42     }
43 
44     /// Provides access to an immutable reference to the wrapped [BufferSubscriber].
map_inner<R, F: FnOnce(&S) -> R>(&self, f: F) -> R45     pub fn map_inner<R, F: FnOnce(&S) -> R>(&self, f: F) -> R {
46         let inner = self.0.lock().unwrap();
47         f(&inner.subscriber)
48     }
49 
50     /// Provides access to a mutable reference to the wrapped [BufferSubscriber].
map_inner_mut<R, F: FnOnce(&mut S) -> R>(&self, f: F) -> R51     pub fn map_inner_mut<R, F: FnOnce(&mut S) -> R>(&self, f: F) -> R {
52         let mut inner = self.0.lock().unwrap();
53         f(&mut inner.subscriber)
54     }
55 }
56 
57 impl<S: BufferSubscriber> Clone for SharedSubscriber<S> {
clone(&self) -> Self58     fn clone(&self) -> Self {
59         Self(Arc::clone(&self.0))
60     }
61 }
62 
63 impl<S: BufferSubscriber> BufferSubscriber for SharedSubscriber<S> {
get_subscriber_stream_config(&self) -> StreamConfig64     fn get_subscriber_stream_config(&self) -> StreamConfig {
65         let inner = self.0.lock().unwrap();
66         inner.subscriber.get_subscriber_stream_config()
67     }
68 
on_subscribe(&mut self, subscription: Box<dyn BufferSubscription>)69     fn on_subscribe(&mut self, subscription: Box<dyn BufferSubscription>) {
70         let mut inner = self.0.lock().unwrap();
71         assert!(
72             !inner.is_subscribed,
73             "A SharedSubscriber can not be shared between two BufferPublishers"
74         );
75         inner.is_subscribed = true;
76 
77         inner.subscriber.on_subscribe(subscription);
78     }
79 
on_next(&mut self, frame: Frame)80     fn on_next(&mut self, frame: Frame) {
81         let mut inner = self.0.lock().unwrap();
82         inner.subscriber.on_next(frame);
83     }
84 
on_error(&mut self, error: BufferError)85     fn on_error(&mut self, error: BufferError) {
86         let mut inner = self.0.lock().unwrap();
87         inner.subscriber.on_error(error);
88     }
89 
on_complete(&mut self)90     fn on_complete(&mut self) {
91         let mut inner = self.0.lock().unwrap();
92         inner.subscriber.on_complete();
93     }
94 }
95