1 // Copyright (C) 2023 The Android Open Source Project 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 use std::sync::{Arc, Mutex}; 16 17 use crate::*; 18 19 /// A simple sharable helper that can be used as a [BufferSubscription] by a [BufferSubscriber] and 20 /// as a state tracker by a [BufferPublisher]. 21 #[derive(Clone, Debug)] 22 pub struct SharedBufferSubscription(Arc<Mutex<BufferSubscriptionData>>); 23 24 #[derive(Debug, Default)] 25 struct BufferSubscriptionData { 26 requests: u64, 27 is_cancelled: bool, 28 } 29 30 impl SharedBufferSubscription { 31 /// Create a new [SharedBufferSubscription]. new() -> Self32 pub fn new() -> Self { 33 SharedBufferSubscription::default() 34 } 35 36 /// Clone this [SharedBufferSubscription] so it can be passed into 37 /// [BufferSubscriber::on_subscribe]. clone_for_subscriber(&self) -> Box<dyn BufferSubscription>38 pub fn clone_for_subscriber(&self) -> Box<dyn BufferSubscription> { 39 Box::new(self.clone()) as Box<dyn BufferSubscription> 40 } 41 42 /// If possible (not cancelled and with requests pending), take take_request(&self) -> bool43 pub fn take_request(&self) -> bool { 44 let mut data = self.0.lock().unwrap(); 45 46 if data.is_cancelled || data.requests == 0 { 47 false 48 } else { 49 data.requests -= 1; 50 true 51 } 52 } 53 54 /// Get the number of pending requests made by the [BufferSubscriber] via 55 /// [BufferSubscription::request]. pending_requests(&self) -> u6456 pub fn pending_requests(&self) -> u64 { 57 self.0.lock().unwrap().requests 58 } 59 60 /// Get get whether the [BufferSubscriber] has called [BufferSubscription::cancel]. is_cancelled(&self) -> bool61 pub fn is_cancelled(&self) -> bool { 62 self.0.lock().unwrap().is_cancelled 63 } 64 } 65 66 impl Default for SharedBufferSubscription { default() -> Self67 fn default() -> Self { 68 Self(Arc::new(Mutex::new(BufferSubscriptionData::default()))) 69 } 70 } 71 72 impl BufferSubscription for SharedBufferSubscription { request(&self, n: u64)73 fn request(&self, n: u64) { 74 let mut data = self.0.lock().unwrap(); 75 if !data.is_cancelled { 76 data.requests = data.requests.saturating_add(n); 77 } 78 } 79 cancel(&self)80 fn cancel(&self) { 81 let mut data = self.0.lock().unwrap(); 82 data.is_cancelled = true; 83 } 84 } 85