1 // Copyright (C) 2023 The Android Open Source Project 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 use crate::{ 16 buffers::BufferPool, subscriptions::SharedBufferSubscription, BufferPublisher, 17 BufferSubscriber, Frame, StreamConfig, 18 }; 19 20 /// The [BufferPoolPublisher] submits buffers from a pool over to the subscriber. 21 pub struct BufferPoolPublisher { 22 stream_config: StreamConfig, 23 buffer_pool: BufferPool, 24 subscription: SharedBufferSubscription, 25 subscriber: Option<Box<dyn BufferSubscriber>>, 26 } 27 28 impl BufferPoolPublisher { 29 /// The [BufferPoolPublisher] needs to initialize a [BufferPool], the [BufferPool] will create 30 /// all buffers at initialization using the stream_config. new(stream_config: StreamConfig, size: usize) -> Option<Self>31 pub fn new(stream_config: StreamConfig, size: usize) -> Option<Self> { 32 BufferPool::new(size, stream_config).map(|buffer_pool| Self { 33 stream_config, 34 buffer_pool, 35 subscription: SharedBufferSubscription::new(), 36 subscriber: None, 37 }) 38 } 39 40 /// If the [SharedBufferSubscription] is ready for a [Frame], a buffer will be requested from 41 /// [BufferPool] and sent over to the [BufferSubscriber]. send_next_frame(&mut self, present_time: i64) -> bool42 pub fn send_next_frame(&mut self, present_time: i64) -> bool { 43 if let Some(subscriber) = self.subscriber.as_mut() { 44 if self.subscription.take_request() { 45 if let Some(buffer) = self.buffer_pool.next_buffer() { 46 let frame = Frame { buffer, present_time, fence: 0 }; 47 48 subscriber.on_next(frame); 49 return true; 50 } 51 } 52 } 53 false 54 } 55 } 56 57 impl BufferPublisher for BufferPoolPublisher { get_publisher_stream_config(&self) -> StreamConfig58 fn get_publisher_stream_config(&self) -> StreamConfig { 59 self.stream_config 60 } 61 subscribe(&mut self, subscriber: impl BufferSubscriber + 'static)62 fn subscribe(&mut self, subscriber: impl BufferSubscriber + 'static) { 63 assert!(self.subscriber.is_none()); 64 65 self.subscriber = Some(Box::new(subscriber)); 66 self.subscriber.as_mut().unwrap().on_subscribe(self.subscription.clone_for_subscriber()); 67 } 68 } 69 70 #[cfg(test)] 71 mod test { 72 use nativewindow::{AHardwareBuffer_Format, AHardwareBuffer_UsageFlags}; 73 74 use super::*; 75 76 use crate::{ 77 subscribers::{ 78 testing::{TestSubscriber, TestingSubscriberEvent}, 79 SharedSubscriber, 80 }, 81 StreamConfig, 82 }; 83 84 const STREAM_CONFIG: StreamConfig = StreamConfig { 85 width: 1, 86 height: 1, 87 layers: 1, 88 format: AHardwareBuffer_Format::AHARDWAREBUFFER_FORMAT_R8G8B8A8_UNORM, 89 usage: AHardwareBuffer_UsageFlags::AHARDWAREBUFFER_USAGE_CPU_READ_OFTEN, 90 stride: 0, 91 }; 92 93 #[test] test_send_next_frame()94 fn test_send_next_frame() { 95 let subscriber = SharedSubscriber::new(TestSubscriber::new(STREAM_CONFIG)); 96 97 let mut buffer_pool_publisher = BufferPoolPublisher::new(STREAM_CONFIG, 1).unwrap(); 98 buffer_pool_publisher.subscribe(subscriber.clone()); 99 100 subscriber.map_inner(|s| s.request(1)); 101 102 assert!(buffer_pool_publisher.send_next_frame(1)); 103 104 let events = subscriber.map_inner_mut(|s| s.take_events()); 105 assert!(matches!(events.last().unwrap(), TestingSubscriberEvent::Next(_))); 106 assert_eq!(buffer_pool_publisher.subscription.pending_requests(), 0); 107 } 108 } 109