// Copyright (C) 2023 The Android Open Source Project // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. //! This module provides [BufferSubscriber] implementations and helpers. use std::sync::{Arc, Mutex}; use crate::*; /// A [BufferSubscriber] wrapper that provides shared access. /// /// Normally, [BufferSubscriber]s are fully owned by the publisher that they are attached to. With /// [SharedSubscriber], a /// /// # Panics /// /// [BufferSubscriber::on_subscribe] on a [SharedSubscriber] can only be called once, otherwise it /// will panic. This is to prevent accidental and unsupported sharing between multiple publishers to /// reflect the usual behavior where a publisher takes full ownership of a subscriber. pub struct SharedSubscriber(Arc>>); struct SharedSubscriberInner { subscriber: S, is_subscribed: bool, } impl SharedSubscriber { /// Create a new wrapper around a [BufferSubscriber]. pub fn new(subscriber: S) -> Self { Self(Arc::new(Mutex::new(SharedSubscriberInner { subscriber, is_subscribed: false }))) } /// Provides access to an immutable reference to the wrapped [BufferSubscriber]. pub fn map_inner R>(&self, f: F) -> R { let inner = self.0.lock().unwrap(); f(&inner.subscriber) } /// Provides access to a mutable reference to the wrapped [BufferSubscriber]. pub fn map_inner_mut R>(&self, f: F) -> R { let mut inner = self.0.lock().unwrap(); f(&mut inner.subscriber) } } impl Clone for SharedSubscriber { fn clone(&self) -> Self { Self(Arc::clone(&self.0)) } } impl BufferSubscriber for SharedSubscriber { fn get_subscriber_stream_config(&self) -> StreamConfig { let inner = self.0.lock().unwrap(); inner.subscriber.get_subscriber_stream_config() } fn on_subscribe(&mut self, subscription: Box) { let mut inner = self.0.lock().unwrap(); assert!( !inner.is_subscribed, "A SharedSubscriber can not be shared between two BufferPublishers" ); inner.is_subscribed = true; inner.subscriber.on_subscribe(subscription); } fn on_next(&mut self, frame: Frame) { let mut inner = self.0.lock().unwrap(); inner.subscriber.on_next(frame); } fn on_error(&mut self, error: BufferError) { let mut inner = self.0.lock().unwrap(); inner.subscriber.on_error(error); } fn on_complete(&mut self) { let mut inner = self.0.lock().unwrap(); inner.subscriber.on_complete(); } }