1 // Copyright 2020 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //      http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include "PixelStreamManager.h"
15 
16 #include <android-base/logging.h>
17 #include <vndk/hardware_buffer.h>
18 
19 #include <algorithm>
20 #include <mutex>
21 #include <thread>
22 
23 #include "PixelFormatUtils.h"
24 
25 namespace android {
26 namespace automotive {
27 namespace computepipe {
28 namespace runner {
29 namespace stream_manager {
30 
PixelMemHandle(int bufferId,int streamId,int additionalUsageFlags)31 PixelMemHandle::PixelMemHandle(int bufferId, int streamId, int additionalUsageFlags)
32     : mBufferId(bufferId),
33       mStreamId(streamId),
34       mBuffer(nullptr),
35       mUsage(AHARDWAREBUFFER_USAGE_CPU_WRITE_OFTEN | additionalUsageFlags) {
36 }
37 
~PixelMemHandle()38 PixelMemHandle::~PixelMemHandle() {
39     if (mBuffer) {
40         AHardwareBuffer_release(mBuffer);
41     }
42 }
43 
getStreamId() const44 int PixelMemHandle::getStreamId() const {
45     return mStreamId;
46 }
47 
getType() const48 proto::PacketType PixelMemHandle::getType() const {
49     return proto::PacketType::PIXEL_DATA;
50 }
getTimeStamp() const51 uint64_t PixelMemHandle::getTimeStamp() const {
52     return mTimestamp;
53 }
54 
getSize() const55 uint32_t PixelMemHandle::getSize() const {
56     return 0;
57 }
58 
getData() const59 const char* PixelMemHandle::getData() const {
60     return nullptr;
61 }
62 
getHardwareBuffer() const63 AHardwareBuffer* PixelMemHandle::getHardwareBuffer() const {
64     return mBuffer;
65 }
66 
67 /* Sets frame info */
setFrameData(uint64_t timestamp,const InputFrame & inputFrame)68 Status PixelMemHandle::setFrameData(uint64_t timestamp, const InputFrame& inputFrame) {
69     // Allocate a new buffer if it is currently null.
70     FrameInfo frameInfo = inputFrame.getFrameInfo();
71     if (mBuffer == nullptr) {
72         mDesc.format = PixelFormatToHardwareBufferFormat(frameInfo.format);
73         mDesc.height = frameInfo.height;
74         mDesc.width = frameInfo.width;
75         mDesc.layers = 1;
76         mDesc.rfu0 = 0;
77         mDesc.rfu1 = 0;
78         mDesc.stride = frameInfo.stride;
79         mDesc.usage = mUsage;
80         int err = AHardwareBuffer_allocate(&mDesc, &mBuffer);
81 
82         if (err != 0 || mBuffer == nullptr) {
83             LOG(ERROR) << "Failed to allocate hardware buffer with error " << err;
84             return Status::NO_MEMORY;
85         }
86 
87         // Update mDesc with the actual descriptor with which the buffer was created. The actual
88         // stride could be different from the specified stride.
89         AHardwareBuffer_describe(mBuffer, &mDesc);
90     }
91 
92     // Verifies that the input frame data has the same type as the allocated buffer.
93     if (frameInfo.width != mDesc.width || frameInfo.height != mDesc.height ||
94         PixelFormatToHardwareBufferFormat(frameInfo.format) != mDesc.format) {
95         LOG(ERROR) << "Variable image sizes from the same stream id is not supported.";
96         return Status::INVALID_ARGUMENT;
97     }
98 
99     // Locks the frame for copying the input frame data.
100     void* mappedBuffer = nullptr;
101     int err = AHardwareBuffer_lock(mBuffer, AHARDWAREBUFFER_USAGE_CPU_WRITE_OFTEN, -1, nullptr,
102                                    &mappedBuffer);
103     if (err != 0 || mappedBuffer == nullptr) {
104         LOG(ERROR) << "Unable to lock a realased hardware buffer.";
105         return Status::INTERNAL_ERROR;
106     }
107 
108     // Copies the input frame data.
109     int bytesPerPixel = numBytesPerPixel(static_cast<AHardwareBuffer_Format>(mDesc.format));
110     // The stride for hardware buffer is specified in pixels while the stride
111     // for InputFrame data structure is specified in bytes.
112     if (mDesc.stride * bytesPerPixel == frameInfo.stride) {
113         memcpy(mappedBuffer, inputFrame.getFramePtr(), mDesc.stride * mDesc.height * bytesPerPixel);
114     } else {
115         for (int y = 0; y < frameInfo.height; y++) {
116             memcpy((uint8_t*)mappedBuffer + mDesc.stride * y * bytesPerPixel,
117                    inputFrame.getFramePtr() + y * frameInfo.stride,
118                    std::min(frameInfo.stride, mDesc.stride * bytesPerPixel));
119         }
120     }
121 
122     AHardwareBuffer_unlock(mBuffer, nullptr);
123     mTimestamp = timestamp;
124 
125     return Status::SUCCESS;
126 }
127 
getBufferId() const128 int PixelMemHandle::getBufferId() const {
129     return mBufferId;
130 }
131 
setEngineInterface(std::shared_ptr<StreamEngineInterface> engine)132 void PixelStreamManager::setEngineInterface(std::shared_ptr<StreamEngineInterface> engine) {
133     std::lock_guard lock(mLock);
134     mEngine = engine;
135 }
136 
setMaxInFlightPackets(uint32_t maxPackets)137 Status PixelStreamManager::setMaxInFlightPackets(uint32_t maxPackets) {
138     std::lock_guard lock(mLock);
139     if (mBuffersInUse.size() > maxPackets) {
140         LOG(ERROR) << "Cannot set max in flight packets after graph has already started.";
141         return Status::ILLEGAL_STATE;
142     }
143 
144     mMaxInFlightPackets = maxPackets;
145     std::lock_guard stateLock(mStateLock);
146     mState = CONFIG_DONE;
147     return Status::SUCCESS;
148 }
149 
freePacket(int bufferId)150 Status PixelStreamManager::freePacket(int bufferId) {
151     std::lock_guard lock(mLock);
152 
153     auto it = mBuffersInUse.find(bufferId);
154 
155     if (it == mBuffersInUse.end()) {
156         std::lock_guard stateLock(mStateLock);
157         // If the graph has already been stopped, we free the buffers
158         // asynchronously, so return SUCCESS if freePacket is called later.
159         if (mState == STOPPED) {
160             return Status::SUCCESS;
161         }
162 
163         LOG(ERROR) << "Unable to find the mem handle. Duplicate release may possible have been "
164                       "called";
165         return Status::INVALID_ARGUMENT;
166     }
167 
168     it->second.outstandingRefCount -= 1;
169     if (it->second.outstandingRefCount == 0) {
170         mBuffersReady.push_back(it->second.handle);
171         mBuffersInUse.erase(it);
172     }
173     return Status::SUCCESS;
174 }
175 
freeAllPackets()176 void PixelStreamManager::freeAllPackets() {
177     std::lock_guard lock(mLock);
178 
179     for (auto [bufferId, buffer] : mBuffersInUse) {
180         mBuffersReady.push_back(buffer.handle);
181     }
182     mBuffersInUse.clear();
183 }
184 
queuePacket(const char *,const uint32_t,uint64_t)185 Status PixelStreamManager::queuePacket(const char* /*data*/, const uint32_t /*size*/,
186                                        uint64_t /*timestamp*/) {
187     LOG(ERROR) << "Trying to queue a semantic packet to a pixel stream manager";
188     return Status::ILLEGAL_STATE;
189 }
190 
queuePacket(const InputFrame & frame,uint64_t timestamp)191 Status PixelStreamManager::queuePacket(const InputFrame& frame, uint64_t timestamp) {
192     std::lock_guard lock(mLock);
193 
194     // State has to be running for the callback to go back.
195     {
196         std::lock_guard stateLock(mStateLock);
197         if (mState != RUNNING) {
198             LOG(ERROR) << "Packet cannot be queued when state is not RUNNING. Current state is"
199                        << mState;
200             return Status::ILLEGAL_STATE;
201         }
202     }
203 
204     if (mEngine == nullptr) {
205         LOG(ERROR) << "Stream to engine interface is not set";
206         return Status::ILLEGAL_STATE;
207     }
208 
209     if (mBuffersInUse.size() >= mMaxInFlightPackets) {
210         LOG(INFO) << "Too many frames in flight. Skipping frame at timestamp " << timestamp;
211         return Status::SUCCESS;
212     }
213 
214     // A unique id per buffer is maintained by incrementing the unique id from the previously
215     // created buffer. The unique id is therefore the number of buffers already created.
216     if (mBuffersReady.empty()) {
217         mBuffersReady.push_back(std::make_shared<PixelMemHandle>(mBuffersInUse.size(), mStreamId));
218     }
219 
220     // The previously used buffer is pushed to the back of the vector. Picking the last used buffer
221     // may be more cache efficient if accessing through CPU, so we use that strategy here.
222     std::shared_ptr<PixelMemHandle> memHandle = mBuffersReady[mBuffersReady.size() - 1];
223     mBuffersReady.resize(mBuffersReady.size() - 1);
224 
225     BufferMetadata bufferMetadata;
226     bufferMetadata.outstandingRefCount = 1;
227     bufferMetadata.handle = memHandle;
228 
229     mBuffersInUse.emplace(memHandle->getBufferId(), bufferMetadata);
230 
231     Status status = memHandle->setFrameData(timestamp, frame);
232     if (status != Status::SUCCESS) {
233         LOG(ERROR) << "Setting frame data failed with error code " << status;
234         return status;
235     }
236 
237     // Dispatch packet to the engine asynchronously in order to avoid circularly
238     // waiting for each others' locks.
239     std::thread t([this, memHandle]() {
240         Status status = mEngine->dispatchPacket(memHandle);
241         if (status != Status::SUCCESS) {
242             mEngine->notifyError(std::string(__func__) + ":" + std::to_string(__LINE__) +
243                                  " Failed to dispatch packet");
244         }
245     });
246     t.detach();
247     return Status::SUCCESS;
248 }
249 
handleExecutionPhase(const RunnerEvent & e)250 Status PixelStreamManager::handleExecutionPhase(const RunnerEvent& e) {
251     std::lock_guard<std::mutex> lock(mStateLock);
252     if (mState == CONFIG_DONE && e.isPhaseEntry()) {
253         mState = RUNNING;
254         return Status::SUCCESS;
255     }
256     if (mState == RESET) {
257         // Cannot get to running phase from reset state without config phase
258         return Status::ILLEGAL_STATE;
259     }
260     if (mState == RUNNING && e.isAborted()) {
261         // Transition back to config completed
262         mState = CONFIG_DONE;
263         return Status::SUCCESS;
264     }
265     if (mState == RUNNING) {
266         return Status::ILLEGAL_STATE;
267     }
268     return Status::SUCCESS;
269 }
270 
handleStopWithFlushPhase(const RunnerEvent & e)271 Status PixelStreamManager::handleStopWithFlushPhase(const RunnerEvent& e) {
272     return handleStopImmediatePhase(e);
273 }
274 
handleStopImmediatePhase(const RunnerEvent & e)275 Status PixelStreamManager::handleStopImmediatePhase(const RunnerEvent& e) {
276     std::lock_guard<std::mutex> lock(mStateLock);
277     if (mState == CONFIG_DONE || mState == RESET) {
278         return ILLEGAL_STATE;
279     }
280     /* Cannot have stop completed if we never entered stop state */
281     if (mState == RUNNING && (e.isAborted() || e.isTransitionComplete())) {
282         return ILLEGAL_STATE;
283     }
284     /* We are being asked to stop */
285     if (mState == RUNNING && e.isPhaseEntry()) {
286         mState = STOPPED;
287         std::thread t([this]() {
288             freeAllPackets();
289             mEngine->notifyEndOfStream();
290         });
291         t.detach();
292         return SUCCESS;
293     }
294     /* Other Components have stopped, we can transition back to CONFIG_DONE */
295     if (mState == STOPPED && e.isTransitionComplete()) {
296         mState = CONFIG_DONE;
297         return SUCCESS;
298     }
299     /* We were stopped, but stop was aborted. */
300     if (mState == STOPPED && e.isAborted()) {
301         mState = RUNNING;
302         return SUCCESS;
303     }
304     return SUCCESS;
305 }
306 
clonePacket(std::shared_ptr<MemHandle> handle)307 std::shared_ptr<MemHandle> PixelStreamManager::clonePacket(std::shared_ptr<MemHandle> handle) {
308     if (!handle) {
309         LOG(ERROR) << "PixelStreamManager - Received null memhandle.";
310         return nullptr;
311     }
312 
313     std::lock_guard<std::mutex> lock(mLock);
314     int bufferId = handle->getBufferId();
315     auto it = mBuffersInUse.find(bufferId);
316     if (it == mBuffersInUse.end()) {
317         LOG(ERROR) << "PixelStreamManager - Attempting to clone an already freed packet.";
318         return nullptr;
319     }
320     it->second.outstandingRefCount += 1;
321     return handle;
322 }
323 
PixelStreamManager(std::string name,int streamId)324 PixelStreamManager::PixelStreamManager(std::string name, int streamId)
325     : StreamManager(name, proto::PacketType::PIXEL_DATA), mStreamId(streamId) {
326 }
327 
328 }  // namespace stream_manager
329 }  // namespace runner
330 }  // namespace computepipe
331 }  // namespace automotive
332 }  // namespace android
333