1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #define LOG_TAG "BufferPoolStatus"
18 //#define LOG_NDEBUG 0
19 
20 #include <time.h>
21 #include "BufferStatus.h"
22 
23 namespace android {
24 namespace hardware {
25 namespace media {
26 namespace bufferpool {
27 namespace V1_0 {
28 namespace implementation {
29 
getTimestampNow()30 int64_t getTimestampNow() {
31     int64_t stamp;
32     struct timespec ts;
33     // TODO: CLOCK_MONOTONIC_COARSE?
34     clock_gettime(CLOCK_MONOTONIC, &ts);
35     stamp = ts.tv_nsec / 1000;
36     stamp += (ts.tv_sec * 1000000LL);
37     return stamp;
38 }
39 
40 static constexpr int kNumElementsInQueue = 1024*16;
41 
open(ConnectionId id,const QueueDescriptor ** fmqDescPtr)42 ResultStatus BufferStatusObserver::open(
43         ConnectionId id, const QueueDescriptor** fmqDescPtr) {
44     if (mBufferStatusQueues.find(id) != mBufferStatusQueues.end()) {
45         // TODO: id collision log?
46         return ResultStatus::CRITICAL_ERROR;
47     }
48     std::unique_ptr<BufferStatusQueue> queue =
49             std::make_unique<BufferStatusQueue>(kNumElementsInQueue);
50     if (!queue || queue->isValid() == false) {
51         *fmqDescPtr = NULL;
52         return ResultStatus::NO_MEMORY;
53     } else {
54         *fmqDescPtr = queue->getDesc();
55     }
56     auto result = mBufferStatusQueues.insert(
57             std::make_pair(id, std::move(queue)));
58     if (!result.second) {
59         *fmqDescPtr = NULL;
60         return ResultStatus::NO_MEMORY;
61     }
62     return ResultStatus::OK;
63 }
64 
close(ConnectionId id)65 ResultStatus BufferStatusObserver::close(ConnectionId id) {
66     if (mBufferStatusQueues.find(id) == mBufferStatusQueues.end()) {
67         return ResultStatus::CRITICAL_ERROR;
68     }
69     mBufferStatusQueues.erase(id);
70     return ResultStatus::OK;
71 }
72 
getBufferStatusChanges(std::vector<BufferStatusMessage> & messages)73 void BufferStatusObserver::getBufferStatusChanges(std::vector<BufferStatusMessage> &messages) {
74     for (auto it = mBufferStatusQueues.begin(); it != mBufferStatusQueues.end(); ++it) {
75         BufferStatusMessage message;
76         size_t avail = it->second->availableToRead();
77         while (avail > 0) {
78             if (!it->second->read(&message, 1)) {
79                 // Since avaliable # of reads are already confirmed,
80                 // this should not happen.
81                 // TODO: error handling (spurious client?)
82                 ALOGW("FMQ message cannot be read from %lld", (long long)it->first);
83                 return;
84             }
85             message.connectionId = it->first;
86             messages.push_back(message);
87             --avail;
88         }
89     }
90 }
91 
BufferStatusChannel(const QueueDescriptor & fmqDesc)92 BufferStatusChannel::BufferStatusChannel(
93         const QueueDescriptor &fmqDesc) {
94     std::unique_ptr<BufferStatusQueue> queue =
95             std::make_unique<BufferStatusQueue>(fmqDesc);
96     if (!queue || queue->isValid() == false) {
97         mValid = false;
98         return;
99     }
100     mValid  = true;
101     mBufferStatusQueue = std::move(queue);
102 }
103 
isValid()104 bool BufferStatusChannel::isValid() {
105     return mValid;
106 }
107 
postBufferRelease(ConnectionId connectionId,std::list<BufferId> & pending,std::list<BufferId> & posted)108 void BufferStatusChannel::postBufferRelease(
109         ConnectionId connectionId,
110         std::list<BufferId> &pending, std::list<BufferId> &posted) {
111     if (mValid && pending.size() > 0) {
112         size_t avail = mBufferStatusQueue->availableToWrite();
113         avail = std::min(avail, pending.size());
114         BufferStatusMessage message;
115         for (size_t i = 0 ; i < avail; ++i) {
116             BufferId id = pending.front();
117             message.newStatus = BufferStatus::NOT_USED;
118             message.bufferId = id;
119             message.connectionId = connectionId;
120             if (!mBufferStatusQueue->write(&message, 1)) {
121                 // Since avaliable # of writes are already confirmed,
122                 // this should not happen.
123                 // TODO: error handing?
124                 ALOGW("FMQ message cannot be sent from %lld", (long long)connectionId);
125                 return;
126             }
127             pending.pop_front();
128             posted.push_back(id);
129         }
130     }
131 }
132 
postBufferStatusMessage(TransactionId transactionId,BufferId bufferId,BufferStatus status,ConnectionId connectionId,ConnectionId targetId,std::list<BufferId> & pending,std::list<BufferId> & posted)133 bool BufferStatusChannel::postBufferStatusMessage(
134         TransactionId transactionId, BufferId bufferId,
135         BufferStatus status, ConnectionId connectionId, ConnectionId targetId,
136         std::list<BufferId> &pending, std::list<BufferId> &posted) {
137     if (mValid) {
138         size_t avail = mBufferStatusQueue->availableToWrite();
139         size_t numPending = pending.size();
140         if (avail >= numPending + 1) {
141             BufferStatusMessage release, message;
142             for (size_t i = 0; i < numPending; ++i) {
143                 BufferId id = pending.front();
144                 release.newStatus = BufferStatus::NOT_USED;
145                 release.bufferId = id;
146                 release.connectionId = connectionId;
147                 if (!mBufferStatusQueue->write(&release, 1)) {
148                     // Since avaliable # of writes are already confirmed,
149                     // this should not happen.
150                     // TODO: error handling?
151                     ALOGW("FMQ message cannot be sent from %lld", (long long)connectionId);
152                     return false;
153                 }
154                 pending.pop_front();
155                 posted.push_back(id);
156             }
157             message.transactionId = transactionId;
158             message.bufferId = bufferId;
159             message.newStatus = status;
160             message.connectionId = connectionId;
161             message.targetConnectionId = targetId;
162             // TODO : timesatamp
163             message.timestampUs = 0;
164             if (!mBufferStatusQueue->write(&message, 1)) {
165                 // Since avaliable # of writes are already confirmed,
166                 // this should not happen.
167                 ALOGW("FMQ message cannot be sent from %lld", (long long)connectionId);
168                 return false;
169             }
170             return true;
171         }
172     }
173     return false;
174 }
175 
176 }  // namespace implementation
177 }  // namespace V1_0
178 }  // namespace bufferpool
179 }  // namespace media
180 }  // namespace hardware
181 }  // namespace android
182 
183