1 // Copyright (C) 2023 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 use crate::{
16     buffers::BufferPool, subscriptions::SharedBufferSubscription, BufferPublisher,
17     BufferSubscriber, Frame, StreamConfig,
18 };
19 
20 /// The [BufferPoolPublisher] submits buffers from a pool over to the subscriber.
21 pub struct BufferPoolPublisher {
22     stream_config: StreamConfig,
23     buffer_pool: BufferPool,
24     subscription: SharedBufferSubscription,
25     subscriber: Option<Box<dyn BufferSubscriber>>,
26 }
27 
28 impl BufferPoolPublisher {
29     /// The [BufferPoolPublisher] needs to initialize a [BufferPool], the [BufferPool] will create
30     /// all buffers at initialization using the stream_config.
new(stream_config: StreamConfig, size: usize) -> Option<Self>31     pub fn new(stream_config: StreamConfig, size: usize) -> Option<Self> {
32         BufferPool::new(size, stream_config).map(|buffer_pool| Self {
33             stream_config,
34             buffer_pool,
35             subscription: SharedBufferSubscription::new(),
36             subscriber: None,
37         })
38     }
39 
40     /// If the [SharedBufferSubscription] is ready for a [Frame], a buffer will be requested from
41     /// [BufferPool] and sent over to the [BufferSubscriber].
send_next_frame(&mut self, present_time: i64) -> bool42     pub fn send_next_frame(&mut self, present_time: i64) -> bool {
43         if let Some(subscriber) = self.subscriber.as_mut() {
44             if self.subscription.take_request() {
45                 if let Some(buffer) = self.buffer_pool.next_buffer() {
46                     let frame = Frame { buffer, present_time, fence: 0 };
47 
48                     subscriber.on_next(frame);
49                     return true;
50                 }
51             }
52         }
53         false
54     }
55 }
56 
57 impl BufferPublisher for BufferPoolPublisher {
get_publisher_stream_config(&self) -> StreamConfig58     fn get_publisher_stream_config(&self) -> StreamConfig {
59         self.stream_config
60     }
61 
subscribe(&mut self, subscriber: impl BufferSubscriber + 'static)62     fn subscribe(&mut self, subscriber: impl BufferSubscriber + 'static) {
63         assert!(self.subscriber.is_none());
64 
65         self.subscriber = Some(Box::new(subscriber));
66         self.subscriber.as_mut().unwrap().on_subscribe(self.subscription.clone_for_subscriber());
67     }
68 }
69 
70 #[cfg(test)]
71 mod test {
72     use nativewindow::{AHardwareBuffer_Format, AHardwareBuffer_UsageFlags};
73 
74     use super::*;
75 
76     use crate::{
77         subscribers::{
78             testing::{TestSubscriber, TestingSubscriberEvent},
79             SharedSubscriber,
80         },
81         StreamConfig,
82     };
83 
84     const STREAM_CONFIG: StreamConfig = StreamConfig {
85         width: 1,
86         height: 1,
87         layers: 1,
88         format: AHardwareBuffer_Format::AHARDWAREBUFFER_FORMAT_R8G8B8A8_UNORM,
89         usage: AHardwareBuffer_UsageFlags::AHARDWAREBUFFER_USAGE_CPU_READ_OFTEN,
90         stride: 0,
91     };
92 
93     #[test]
test_send_next_frame()94     fn test_send_next_frame() {
95         let subscriber = SharedSubscriber::new(TestSubscriber::new(STREAM_CONFIG));
96 
97         let mut buffer_pool_publisher = BufferPoolPublisher::new(STREAM_CONFIG, 1).unwrap();
98         buffer_pool_publisher.subscribe(subscriber.clone());
99 
100         subscriber.map_inner(|s| s.request(1));
101 
102         assert!(buffer_pool_publisher.send_next_frame(1));
103 
104         let events = subscriber.map_inner_mut(|s| s.take_events());
105         assert!(matches!(events.last().unwrap(), TestingSubscriberEvent::Next(_)));
106         assert_eq!(buffer_pool_publisher.subscription.pending_requests(), 0);
107     }
108 }
109