1 // Copyright (C) 2023 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 //! libbufferstreams: Reactive Streams for Graphics Buffers
16 
17 pub mod buffers;
18 pub mod publishers;
19 mod stream_config;
20 pub mod subscribers;
21 pub mod subscriptions;
22 
23 use buffers::Buffer;
24 pub use stream_config::*;
25 
26 /// This function will print Hello World.
27 #[no_mangle]
hello() -> bool28 pub extern "C" fn hello() -> bool {
29     println!("Hello world.");
30     true
31 }
32 
33 /// BufferPublishers provide buffers to BufferSusbscribers. Depending on the
34 /// particular object in question, these could be allocated locally or provided
35 /// over IPC.
36 ///
37 /// BufferPublishers are required to adhere to the following, based on the
38 /// reactive streams specification:
39 /// * The total number of on_next´s signalled by a Publisher to a Subscriber
40 /// MUST be less than or equal to the total number of elements requested by that
41 /// Subscriber´s Subscription at all times.
42 /// * A Publisher MAY signal fewer on_next than requested and terminate the
43 /// Subscription by calling on_complete or on_error.
44 /// * on_subscribe, on_next, on_error and on_complete signaled to a Subscriber
45 /// MUST be signaled serially.
46 /// * If a Publisher fails it MUST signal an on_error.
47 /// * If a Publisher terminates successfully (finite stream) it MUST signal an
48 /// on_complete.
49 /// * If a Publisher signals either on_error or on_complete on a Subscriber,
50 /// that Subscriber’s Subscription MUST be considered cancelled.
51 /// * Once a terminal state has been signaled (on_error, on_complete) it is
52 /// REQUIRED that no further signals occur.
53 /// * If a Subscription is cancelled its Subscriber MUST eventually stop being
54 ///  signaled.
55 /// * A Publisher MAY support multiple Subscribers and decides whether each
56 /// Subscription is unicast or multicast.
57 pub trait BufferPublisher {
58     /// Returns the StreamConfig of buffers that publisher creates.
get_publisher_stream_config(&self) -> StreamConfig59     fn get_publisher_stream_config(&self) -> StreamConfig;
60     /// This function will create the subscription between the publisher and
61     /// the subscriber.
subscribe(&mut self, subscriber: impl BufferSubscriber + 'static)62     fn subscribe(&mut self, subscriber: impl BufferSubscriber + 'static);
63 }
64 
65 /// BufferSubscribers can subscribe to BufferPublishers. They can request Frames
66 /// via the BufferSubscription they get from the publisher, then receive Frames
67 /// via on_next.
68 ///
69 /// BufferSubcribers are required to adhere to the following, based on the
70 /// reactive streams specification:
71 /// * The total number of on_next´s signalled by a Publisher to a Subscriber
72 /// MUST be less than or equal to the total number of elements requested by that
73 /// Subscriber´s Subscription at all times.
74 /// * A Publisher MAY signal fewer on_next than requested and terminate the
75 /// Subscription by calling on_complete or on_error.
76 /// * on_subscribe, on_next, on_error and on_complete signaled to a Subscriber
77 /// MUST be signaled serially.
78 /// * If a Publisher fails it MUST signal an on_error.
79 /// * If a Publisher terminates successfully (finite stream) it MUST signal an
80 /// on_complete.
81 /// * If a Publisher signals either on_error or on_complete on a Subscriber,
82 /// that Subscriber’s Subscription MUST be considered cancelled.
83 /// * Once a terminal state has been signaled (on_error, on_complete) it is
84 /// REQUIRED that no further signals occur.
85 /// * If a Subscription is cancelled its Subscriber MUST eventually stop being
86 /// signaled.
87 /// * Publisher.subscribe MAY be called as many times as wanted but MUST be
88 /// with a different Subscriber each time.
89 /// * A Publisher MAY support multiple Subscribers and decides whether each
90 /// Subscription is unicast or multicast.
91 pub trait BufferSubscriber {
92     /// The StreamConfig of buffers that this subscriber expects.
get_subscriber_stream_config(&self) -> StreamConfig93     fn get_subscriber_stream_config(&self) -> StreamConfig;
94     /// This function will be called at the beginning of the subscription.
on_subscribe(&mut self, subscription: Box<dyn BufferSubscription>)95     fn on_subscribe(&mut self, subscription: Box<dyn BufferSubscription>);
96     /// This function will be called for buffer that comes in.
on_next(&mut self, frame: Frame)97     fn on_next(&mut self, frame: Frame);
98     /// This function will be called in case of an error.
on_error(&mut self, error: BufferError)99     fn on_error(&mut self, error: BufferError);
100     /// This function will be called on finite streams when done.
on_complete(&mut self)101     fn on_complete(&mut self);
102 }
103 
104 /// BufferSubscriptions serve as the bridge between BufferPublishers and
105 /// BufferSubscribers. BufferSubscribers receive a BufferSubscription when they
106 /// subscribe to a BufferPublisher via on_subscribe.
107 ///
108 /// This object is used by the BufferSubscriber to cancel its subscription
109 /// or request more buffers.
110 ///
111 /// BufferSubcriptions are required to adhere to the following, based on the
112 /// reactive streams specification:
113 /// * Subscription.request and Subscription.cancel MUST only be called inside
114 /// of its Subscriber context.
115 /// * The Subscription MUST allow the Subscriber to call Subscription.request
116 /// synchronously from within on_next or on_subscribe.
117 /// * Subscription.request MUST place an upper bound on possible synchronous
118 /// recursion between Publisher and Subscriber.
119 /// * Subscription.request SHOULD respect the responsivity of its caller by
120 /// returning in a timely manner.
121 /// * Subscription.cancel MUST respect the responsivity of its caller by
122 /// returning in a timely manner, MUST be idempotent and MUST be thread-safe.
123 /// * After the Subscription is cancelled, additional
124 /// Subscription.request(n: u64) MUST be NOPs.
125 /// * After the Subscription is cancelled, additional Subscription.cancel()
126 /// MUST be NOPs.
127 /// * While the Subscription is not cancelled, Subscription.request(n: u64)
128 /// MUST register the given number of additional elements to be produced to the
129 /// respective subscriber.
130 /// * While the Subscription is not cancelled, Subscription.request(n: u64)
131 /// MUST signal on_error if the argument is <= 0. The cause message SHOULD
132 /// explain that non-positive request signals are illegal.
133 /// * While the Subscription is not cancelled, Subscription.request(n: u64)
134 /// MAY synchronously call on_next on this (or other) subscriber(s).
135 /// * While the Subscription is not cancelled, Subscription.request(n: u64)
136 /// MAY synchronously call on_complete or on_error on this (or other)
137 /// subscriber(s).
138 /// * While the Subscription is not cancelled, Subscription.cancel() MUST
139 /// request the Publisher to eventually stop signaling its Subscriber. The
140 /// operation is NOT REQUIRED to affect the Subscription immediately.
141 /// * While the Subscription is not cancelled, Subscription.cancel() MUST
142 /// request the Publisher to eventually drop any references to the corresponding
143 /// subscriber.
144 /// * While the Subscription is not cancelled, calling Subscription.cancel MAY
145 /// cause the Publisher, if stateful, to transition into the shut-down state if
146 /// no other Subscription exists at this point.
147 /// * Calling Subscription.cancel MUST return normally.
148 /// * Calling Subscription.request MUST return normally.
149 pub trait BufferSubscription: Send + Sync + 'static {
150     /// request
request(&self, n: u64)151     fn request(&self, n: u64);
152     /// cancel
cancel(&self)153     fn cancel(&self);
154 }
155 
156 /// Type used to describe errors produced by subscriptions.
157 pub type BufferError = anyhow::Error;
158 
159 /// Struct used to contain the buffer.
160 pub struct Frame {
161     /// A buffer to be used this frame.
162     pub buffer: Buffer,
163     /// The time at which this buffer is expected to be displayed.
164     pub present_time: i64,
165     /// A fence used for reading/writing safely.
166     pub fence: i32,
167 }
168 
169 #[cfg(test)]
170 mod test {
171     #![allow(warnings, unused)]
172     use super::*;
173 
174     use anyhow::anyhow;
175     use buffers::Buffer;
176     use nativewindow::{AHardwareBuffer_Format, AHardwareBuffer_UsageFlags};
177     use std::{borrow::BorrowMut, error::Error, ops::Add, sync::Arc};
178 
179     use crate::{
180         publishers::testing::*,
181         subscribers::{testing::*, SharedSubscriber},
182     };
183 
184     const STREAM_CONFIG: StreamConfig = StreamConfig {
185         width: 1,
186         height: 1,
187         layers: 1,
188         format: AHardwareBuffer_Format::AHARDWAREBUFFER_FORMAT_R8G8B8A8_UNORM,
189         usage: AHardwareBuffer_UsageFlags::AHARDWAREBUFFER_USAGE_CPU_READ_OFTEN,
190         stride: 0,
191     };
192 
make_frame() -> Frame193     fn make_frame() -> Frame {
194         Frame {
195             buffer: Buffer::new_unowned(
196                 STREAM_CONFIG
197                     .create_hardware_buffer()
198                     .expect("Unable to create hardware buffer for test"),
199             ),
200             present_time: 1,
201             fence: 0,
202         }
203     }
204 
205     #[test]
test_test_implementations_next()206     fn test_test_implementations_next() {
207         let subscriber = SharedSubscriber::new(TestSubscriber::new(STREAM_CONFIG));
208         let mut publisher = TestPublisher::new(STREAM_CONFIG);
209 
210         publisher.subscribe(subscriber.clone());
211         assert!(subscriber.map_inner(|s| s.has_subscription()));
212         assert!(publisher.has_subscriber());
213 
214         publisher.send_frame(make_frame());
215         let events = subscriber.map_inner_mut(|s| s.take_events());
216         assert!(!matches!(events.last().unwrap(), TestingSubscriberEvent::Next(_)));
217 
218         subscriber.map_inner(|s| s.request(1));
219         assert_eq!(publisher.pending_requests(), 1);
220 
221         publisher.send_frame(make_frame());
222         let events = subscriber.map_inner_mut(|s| s.take_events());
223         assert!(matches!(events.last().unwrap(), TestingSubscriberEvent::Next(_)));
224         assert_eq!(publisher.pending_requests(), 0);
225     }
226 
227     #[test]
test_test_implementations_complete()228     fn test_test_implementations_complete() {
229         let subscriber = SharedSubscriber::new(TestSubscriber::new(STREAM_CONFIG));
230         let mut publisher = TestPublisher::new(STREAM_CONFIG);
231 
232         publisher.subscribe(subscriber.clone());
233         assert!(subscriber.map_inner(|s| s.has_subscription()));
234         assert!(publisher.has_subscriber());
235 
236         publisher.send_complete();
237         let events = subscriber.map_inner_mut(|s| s.take_events());
238         assert!(matches!(events.last().unwrap(), TestingSubscriberEvent::Complete));
239     }
240 
241     #[test]
test_test_implementations_error()242     fn test_test_implementations_error() {
243         let subscriber = SharedSubscriber::new(TestSubscriber::new(STREAM_CONFIG));
244         let mut publisher = TestPublisher::new(STREAM_CONFIG);
245 
246         publisher.subscribe(subscriber.clone());
247         assert!(subscriber.map_inner(|s| s.has_subscription()));
248         assert!(publisher.has_subscriber());
249 
250         publisher.send_error(anyhow!("error"));
251         let events = subscriber.map_inner_mut(|s| s.take_events());
252         assert!(matches!(events.last().unwrap(), TestingSubscriberEvent::Error(_)));
253     }
254 }
255