1 // Copyright (C) 2023 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 use std::sync::{Arc, Mutex};
16 
17 use crate::*;
18 
19 /// A simple sharable helper that can be used as a [BufferSubscription] by a [BufferSubscriber] and
20 /// as a state tracker by a [BufferPublisher].
21 #[derive(Clone, Debug)]
22 pub struct SharedBufferSubscription(Arc<Mutex<BufferSubscriptionData>>);
23 
24 #[derive(Debug, Default)]
25 struct BufferSubscriptionData {
26     requests: u64,
27     is_cancelled: bool,
28 }
29 
30 impl SharedBufferSubscription {
31     /// Create a new [SharedBufferSubscription].
new() -> Self32     pub fn new() -> Self {
33         SharedBufferSubscription::default()
34     }
35 
36     /// Clone this [SharedBufferSubscription] so it can be passed into
37     /// [BufferSubscriber::on_subscribe].
clone_for_subscriber(&self) -> Box<dyn BufferSubscription>38     pub fn clone_for_subscriber(&self) -> Box<dyn BufferSubscription> {
39         Box::new(self.clone()) as Box<dyn BufferSubscription>
40     }
41 
42     /// If possible (not cancelled and with requests pending), take
take_request(&self) -> bool43     pub fn take_request(&self) -> bool {
44         let mut data = self.0.lock().unwrap();
45 
46         if data.is_cancelled || data.requests == 0 {
47             false
48         } else {
49             data.requests -= 1;
50             true
51         }
52     }
53 
54     /// Get the number of pending requests made by the [BufferSubscriber] via
55     /// [BufferSubscription::request].
pending_requests(&self) -> u6456     pub fn pending_requests(&self) -> u64 {
57         self.0.lock().unwrap().requests
58     }
59 
60     /// Get get whether the [BufferSubscriber] has called [BufferSubscription::cancel].
is_cancelled(&self) -> bool61     pub fn is_cancelled(&self) -> bool {
62         self.0.lock().unwrap().is_cancelled
63     }
64 }
65 
66 impl Default for SharedBufferSubscription {
default() -> Self67     fn default() -> Self {
68         Self(Arc::new(Mutex::new(BufferSubscriptionData::default())))
69     }
70 }
71 
72 impl BufferSubscription for SharedBufferSubscription {
request(&self, n: u64)73     fn request(&self, n: u64) {
74         let mut data = self.0.lock().unwrap();
75         if !data.is_cancelled {
76             data.requests = data.requests.saturating_add(n);
77         }
78     }
79 
cancel(&self)80     fn cancel(&self) {
81         let mut data = self.0.lock().unwrap();
82         data.is_cancelled = true;
83     }
84 }
85