1 // Copyright (C) 2023 The Android Open Source Project 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 //! This module provides [BufferSubscriber] implementations and helpers. 16 17 use std::sync::{Arc, Mutex}; 18 19 use crate::*; 20 21 /// A [BufferSubscriber] wrapper that provides shared access. 22 /// 23 /// Normally, [BufferSubscriber]s are fully owned by the publisher that they are attached to. With 24 /// [SharedSubscriber], a 25 /// 26 /// # Panics 27 /// 28 /// [BufferSubscriber::on_subscribe] on a [SharedSubscriber] can only be called once, otherwise it 29 /// will panic. This is to prevent accidental and unsupported sharing between multiple publishers to 30 /// reflect the usual behavior where a publisher takes full ownership of a subscriber. 31 pub struct SharedSubscriber<S: BufferSubscriber>(Arc<Mutex<SharedSubscriberInner<S>>>); 32 33 struct SharedSubscriberInner<S: BufferSubscriber> { 34 subscriber: S, 35 is_subscribed: bool, 36 } 37 38 impl<S: BufferSubscriber> SharedSubscriber<S> { 39 /// Create a new wrapper around a [BufferSubscriber]. new(subscriber: S) -> Self40 pub fn new(subscriber: S) -> Self { 41 Self(Arc::new(Mutex::new(SharedSubscriberInner { subscriber, is_subscribed: false }))) 42 } 43 44 /// Provides access to an immutable reference to the wrapped [BufferSubscriber]. map_inner<R, F: FnOnce(&S) -> R>(&self, f: F) -> R45 pub fn map_inner<R, F: FnOnce(&S) -> R>(&self, f: F) -> R { 46 let inner = self.0.lock().unwrap(); 47 f(&inner.subscriber) 48 } 49 50 /// Provides access to a mutable reference to the wrapped [BufferSubscriber]. map_inner_mut<R, F: FnOnce(&mut S) -> R>(&self, f: F) -> R51 pub fn map_inner_mut<R, F: FnOnce(&mut S) -> R>(&self, f: F) -> R { 52 let mut inner = self.0.lock().unwrap(); 53 f(&mut inner.subscriber) 54 } 55 } 56 57 impl<S: BufferSubscriber> Clone for SharedSubscriber<S> { clone(&self) -> Self58 fn clone(&self) -> Self { 59 Self(Arc::clone(&self.0)) 60 } 61 } 62 63 impl<S: BufferSubscriber> BufferSubscriber for SharedSubscriber<S> { get_subscriber_stream_config(&self) -> StreamConfig64 fn get_subscriber_stream_config(&self) -> StreamConfig { 65 let inner = self.0.lock().unwrap(); 66 inner.subscriber.get_subscriber_stream_config() 67 } 68 on_subscribe(&mut self, subscription: Box<dyn BufferSubscription>)69 fn on_subscribe(&mut self, subscription: Box<dyn BufferSubscription>) { 70 let mut inner = self.0.lock().unwrap(); 71 assert!( 72 !inner.is_subscribed, 73 "A SharedSubscriber can not be shared between two BufferPublishers" 74 ); 75 inner.is_subscribed = true; 76 77 inner.subscriber.on_subscribe(subscription); 78 } 79 on_next(&mut self, frame: Frame)80 fn on_next(&mut self, frame: Frame) { 81 let mut inner = self.0.lock().unwrap(); 82 inner.subscriber.on_next(frame); 83 } 84 on_error(&mut self, error: BufferError)85 fn on_error(&mut self, error: BufferError) { 86 let mut inner = self.0.lock().unwrap(); 87 inner.subscriber.on_error(error); 88 } 89 on_complete(&mut self)90 fn on_complete(&mut self) { 91 let mut inner = self.0.lock().unwrap(); 92 inner.subscriber.on_complete(); 93 } 94 } 95