1 /*
2  * Copyright 2014 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <inttypes.h>
18 
19 #define LOG_TAG "StreamSplitter"
20 #define ATRACE_TAG ATRACE_TAG_GRAPHICS
21 //#define LOG_NDEBUG 0
22 
23 #include <gui/BufferItem.h>
24 #include <gui/IGraphicBufferConsumer.h>
25 #include <gui/IGraphicBufferProducer.h>
26 #include <gui/StreamSplitter.h>
27 
28 #include <ui/GraphicBuffer.h>
29 
30 #include <binder/ProcessState.h>
31 
32 #include <utils/Trace.h>
33 
34 #include <system/window.h>
35 
36 namespace android {
37 
createSplitter(const sp<IGraphicBufferConsumer> & inputQueue,sp<StreamSplitter> * outSplitter)38 status_t StreamSplitter::createSplitter(
39         const sp<IGraphicBufferConsumer>& inputQueue,
40         sp<StreamSplitter>* outSplitter) {
41     if (inputQueue == nullptr) {
42         ALOGE("createSplitter: inputQueue must not be NULL");
43         return BAD_VALUE;
44     }
45     if (outSplitter == nullptr) {
46         ALOGE("createSplitter: outSplitter must not be NULL");
47         return BAD_VALUE;
48     }
49 
50     sp<StreamSplitter> splitter(new StreamSplitter(inputQueue));
51     status_t status = splitter->mInput->consumerConnect(splitter, false);
52     if (status == NO_ERROR) {
53         splitter->mInput->setConsumerName(String8("StreamSplitter"));
54         *outSplitter = splitter;
55     }
56     return status;
57 }
58 
StreamSplitter(const sp<IGraphicBufferConsumer> & inputQueue)59 StreamSplitter::StreamSplitter(const sp<IGraphicBufferConsumer>& inputQueue)
60       : mIsAbandoned(false), mMutex(), mReleaseCondition(),
61         mOutstandingBuffers(0), mInput(inputQueue), mOutputs(), mBuffers() {}
62 
~StreamSplitter()63 StreamSplitter::~StreamSplitter() {
64     mInput->consumerDisconnect();
65     Vector<sp<IGraphicBufferProducer> >::iterator output = mOutputs.begin();
66     for (; output != mOutputs.end(); ++output) {
67         (*output)->disconnect(NATIVE_WINDOW_API_CPU);
68     }
69 
70     if (mBuffers.size() > 0) {
71         ALOGE("%zu buffers still being tracked", mBuffers.size());
72     }
73 }
74 
addOutput(const sp<IGraphicBufferProducer> & outputQueue)75 status_t StreamSplitter::addOutput(
76         const sp<IGraphicBufferProducer>& outputQueue) {
77     if (outputQueue == nullptr) {
78         ALOGE("addOutput: outputQueue must not be NULL");
79         return BAD_VALUE;
80     }
81 
82     Mutex::Autolock lock(mMutex);
83 
84     IGraphicBufferProducer::QueueBufferOutput queueBufferOutput;
85     sp<OutputListener> listener(new OutputListener(this, outputQueue));
86     IInterface::asBinder(outputQueue)->linkToDeath(listener);
87     status_t status = outputQueue->connect(listener, NATIVE_WINDOW_API_CPU,
88             /* producerControlledByApp */ false, &queueBufferOutput);
89     if (status != NO_ERROR) {
90         ALOGE("addOutput: failed to connect (%d)", status);
91         return status;
92     }
93 
94     mOutputs.push_back(outputQueue);
95 
96     return NO_ERROR;
97 }
98 
setName(const String8 & name)99 void StreamSplitter::setName(const String8 &name) {
100     Mutex::Autolock lock(mMutex);
101     mInput->setConsumerName(name);
102 }
103 
onFrameAvailable(const BufferItem &)104 void StreamSplitter::onFrameAvailable(const BufferItem& /* item */) {
105     ATRACE_CALL();
106     Mutex::Autolock lock(mMutex);
107 
108     // The current policy is that if any one consumer is consuming buffers too
109     // slowly, the splitter will stall the rest of the outputs by not acquiring
110     // any more buffers from the input. This will cause back pressure on the
111     // input queue, slowing down its producer.
112 
113     // If there are too many outstanding buffers, we block until a buffer is
114     // released back to the input in onBufferReleased
115     while (mOutstandingBuffers >= MAX_OUTSTANDING_BUFFERS) {
116         mReleaseCondition.wait(mMutex);
117 
118         // If the splitter is abandoned while we are waiting, the release
119         // condition variable will be broadcast, and we should just return
120         // without attempting to do anything more (since the input queue will
121         // also be abandoned).
122         if (mIsAbandoned) {
123             return;
124         }
125     }
126     ++mOutstandingBuffers;
127 
128     // Acquire and detach the buffer from the input
129     BufferItem bufferItem;
130     status_t status = mInput->acquireBuffer(&bufferItem, /* presentWhen */ 0);
131     LOG_ALWAYS_FATAL_IF(status != NO_ERROR,
132             "acquiring buffer from input failed (%d)", status);
133 
134     ALOGV("acquired buffer %#" PRIx64 " from input",
135             bufferItem.mGraphicBuffer->getId());
136 
137     status = mInput->detachBuffer(bufferItem.mSlot);
138     LOG_ALWAYS_FATAL_IF(status != NO_ERROR,
139             "detaching buffer from input failed (%d)", status);
140 
141     // Initialize our reference count for this buffer
142     mBuffers.add(bufferItem.mGraphicBuffer->getId(),
143             new BufferTracker(bufferItem.mGraphicBuffer));
144 
145     IGraphicBufferProducer::QueueBufferInput queueInput(
146             bufferItem.mTimestamp, bufferItem.mIsAutoTimestamp,
147             bufferItem.mDataSpace, bufferItem.mCrop,
148             static_cast<int32_t>(bufferItem.mScalingMode),
149             bufferItem.mTransform, bufferItem.mFence);
150 
151     // Attach and queue the buffer to each of the outputs
152     Vector<sp<IGraphicBufferProducer> >::iterator output = mOutputs.begin();
153     for (; output != mOutputs.end(); ++output) {
154         int slot;
155         status = (*output)->attachBuffer(&slot, bufferItem.mGraphicBuffer);
156         if (status == NO_INIT) {
157             // If we just discovered that this output has been abandoned, note
158             // that, increment the release count so that we still release this
159             // buffer eventually, and move on to the next output
160             onAbandonedLocked();
161             mBuffers.editValueFor(bufferItem.mGraphicBuffer->getId())->
162                     incrementReleaseCountLocked();
163             continue;
164         } else {
165             LOG_ALWAYS_FATAL_IF(status != NO_ERROR,
166                     "attaching buffer to output failed (%d)", status);
167         }
168 
169         IGraphicBufferProducer::QueueBufferOutput queueOutput;
170         status = (*output)->queueBuffer(slot, queueInput, &queueOutput);
171         if (status == NO_INIT) {
172             // If we just discovered that this output has been abandoned, note
173             // that, increment the release count so that we still release this
174             // buffer eventually, and move on to the next output
175             onAbandonedLocked();
176             mBuffers.editValueFor(bufferItem.mGraphicBuffer->getId())->
177                     incrementReleaseCountLocked();
178             continue;
179         } else {
180             LOG_ALWAYS_FATAL_IF(status != NO_ERROR,
181                     "queueing buffer to output failed (%d)", status);
182         }
183 
184         ALOGV("queued buffer %#" PRIx64 " to output %p",
185                 bufferItem.mGraphicBuffer->getId(), output->get());
186     }
187 }
188 
onBufferReleasedByOutput(const sp<IGraphicBufferProducer> & from)189 void StreamSplitter::onBufferReleasedByOutput(
190         const sp<IGraphicBufferProducer>& from) {
191     ATRACE_CALL();
192     Mutex::Autolock lock(mMutex);
193 
194     sp<GraphicBuffer> buffer;
195     sp<Fence> fence;
196     status_t status = from->detachNextBuffer(&buffer, &fence);
197     if (status == NO_INIT) {
198         // If we just discovered that this output has been abandoned, note that,
199         // but we can't do anything else, since buffer is invalid
200         onAbandonedLocked();
201         return;
202     } else {
203         LOG_ALWAYS_FATAL_IF(status != NO_ERROR,
204                 "detaching buffer from output failed (%d)", status);
205     }
206 
207     ALOGV("detached buffer %#" PRIx64 " from output %p",
208           buffer->getId(), from.get());
209 
210     const sp<BufferTracker>& tracker = mBuffers.editValueFor(buffer->getId());
211 
212     // Merge the release fence of the incoming buffer so that the fence we send
213     // back to the input includes all of the outputs' fences
214     tracker->mergeFence(fence);
215 
216     // Check to see if this is the last outstanding reference to this buffer
217     size_t releaseCount = tracker->incrementReleaseCountLocked();
218     ALOGV("buffer %#" PRIx64 " reference count %zu (of %zu)", buffer->getId(),
219             releaseCount, mOutputs.size());
220     if (releaseCount < mOutputs.size()) {
221         return;
222     }
223 
224     // If we've been abandoned, we can't return the buffer to the input, so just
225     // stop tracking it and move on
226     if (mIsAbandoned) {
227         mBuffers.removeItem(buffer->getId());
228         return;
229     }
230 
231     // Attach and release the buffer back to the input
232     int consumerSlot;
233     status = mInput->attachBuffer(&consumerSlot, tracker->getBuffer());
234     LOG_ALWAYS_FATAL_IF(status != NO_ERROR,
235             "attaching buffer to input failed (%d)", status);
236 
237     status = mInput->releaseBuffer(consumerSlot, /* frameNumber */ 0,
238             EGL_NO_DISPLAY, EGL_NO_SYNC_KHR, tracker->getMergedFence());
239     LOG_ALWAYS_FATAL_IF(status != NO_ERROR,
240             "releasing buffer to input failed (%d)", status);
241 
242     ALOGV("released buffer %#" PRIx64 " to input", buffer->getId());
243 
244     // We no longer need to track the buffer once it has been returned to the
245     // input
246     mBuffers.removeItem(buffer->getId());
247 
248     // Notify any waiting onFrameAvailable calls
249     --mOutstandingBuffers;
250     mReleaseCondition.signal();
251 }
252 
onAbandonedLocked()253 void StreamSplitter::onAbandonedLocked() {
254     ALOGE("one of my outputs has abandoned me");
255     if (!mIsAbandoned) {
256         mInput->consumerDisconnect();
257     }
258     mIsAbandoned = true;
259     mReleaseCondition.broadcast();
260 }
261 
OutputListener(const sp<StreamSplitter> & splitter,const sp<IGraphicBufferProducer> & output)262 StreamSplitter::OutputListener::OutputListener(
263         const sp<StreamSplitter>& splitter,
264         const sp<IGraphicBufferProducer>& output)
265       : mSplitter(splitter), mOutput(output) {}
266 
~OutputListener()267 StreamSplitter::OutputListener::~OutputListener() {}
268 
onBufferReleased()269 void StreamSplitter::OutputListener::onBufferReleased() {
270     mSplitter->onBufferReleasedByOutput(mOutput);
271 }
272 
binderDied(const wp<IBinder> &)273 void StreamSplitter::OutputListener::binderDied(const wp<IBinder>& /* who */) {
274     Mutex::Autolock lock(mSplitter->mMutex);
275     mSplitter->onAbandonedLocked();
276 }
277 
BufferTracker(const sp<GraphicBuffer> & buffer)278 StreamSplitter::BufferTracker::BufferTracker(const sp<GraphicBuffer>& buffer)
279       : mBuffer(buffer), mMergedFence(Fence::NO_FENCE), mReleaseCount(0) {}
280 
~BufferTracker()281 StreamSplitter::BufferTracker::~BufferTracker() {}
282 
mergeFence(const sp<Fence> & with)283 void StreamSplitter::BufferTracker::mergeFence(const sp<Fence>& with) {
284     mMergedFence = Fence::merge(String8("StreamSplitter"), mMergedFence, with);
285 }
286 
287 } // namespace android
288