1 /*
2 * Copyright (c) 2021, The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "TelemetryServer.h"
18
19 #include "CarTelemetryImpl.h"
20 #include "RingBuffer.h"
21
22 #include <aidl/android/automotive/telemetry/internal/CarDataInternal.h>
23 #include <android-base/logging.h>
24
25 #include <inttypes.h> // for PRIu64 and friends
26
27 #include <cstdint>
28 #include <memory>
29
30 namespace android {
31 namespace automotive {
32 namespace telemetry {
33
34 namespace {
35
36 using ::aidl::android::automotive::telemetry::internal::CarDataInternal;
37 using ::aidl::android::automotive::telemetry::internal::ICarDataListener;
38 using ::aidl::android::frameworks::automotive::telemetry::CarData;
39 using ::aidl::android::frameworks::automotive::telemetry::ICarTelemetryCallback;
40 using ::android::base::Error;
41 using ::android::base::Result;
42
43 constexpr int kMsgPushCarDataToListener = 1;
44
45 // If ICarDataListener cannot accept data, the next push should be delayed little bit to allow
46 // the listener to recover.
47 constexpr const std::chrono::seconds kPushCarDataFailureDelaySeconds = 1s;
48 } // namespace
49
TelemetryServer(LooperWrapper * looper,const std::chrono::nanoseconds & pushCarDataDelayNs,const int maxBufferSize)50 TelemetryServer::TelemetryServer(LooperWrapper* looper,
51 const std::chrono::nanoseconds& pushCarDataDelayNs,
52 const int maxBufferSize) :
53 mLooper(looper),
54 mPushCarDataDelayNs(pushCarDataDelayNs),
55 mRingBuffer(maxBufferSize),
56 mMessageHandler(new MessageHandlerImpl(this)) {}
57
setListener(const std::shared_ptr<ICarDataListener> & listener)58 void TelemetryServer::setListener(const std::shared_ptr<ICarDataListener>& listener) {
59 const std::scoped_lock<std::mutex> lock(mMutex);
60 mCarDataListener = listener;
61 mLooper->sendMessageDelayed(mPushCarDataDelayNs.count(), mMessageHandler,
62 kMsgPushCarDataToListener);
63 }
64
clearListener()65 void TelemetryServer::clearListener() {
66 const std::scoped_lock<std::mutex> lock(mMutex);
67 if (mCarDataListener == nullptr) {
68 return;
69 }
70 mCarDataListener = nullptr;
71 mLooper->removeMessages(mMessageHandler, kMsgPushCarDataToListener);
72 }
73
findCarDataIdsIntersection(const std::vector<int32_t> & ids)74 std::vector<int32_t> TelemetryServer::findCarDataIdsIntersection(const std::vector<int32_t>& ids) {
75 std::vector<int32_t> interestedIds;
76 for (int32_t id : ids) {
77 if (mCarDataIds.find(id) != mCarDataIds.end()) {
78 interestedIds.push_back(id);
79 }
80 }
81 return interestedIds;
82 }
83
addCarDataIds(const std::vector<int32_t> & ids)84 void TelemetryServer::addCarDataIds(const std::vector<int32_t>& ids) {
85 const std::scoped_lock<std::mutex> lock(mMutex);
86 mCarDataIds.insert(ids.cbegin(), ids.cend());
87 std::unordered_set<TelemetryCallback, TelemetryCallback::HashFunction> invokedCallbacks;
88 LOG(VERBOSE) << "Received addCarDataIds call from CarTelemetryService, notifying callbacks";
89 for (int32_t id : ids) {
90 if (mIdToCallbacksMap.find(id) == mIdToCallbacksMap.end()) {
91 // prevent out of range exception when calling unordered_map.at()
92 continue;
93 }
94 const auto& callbacksForId = mIdToCallbacksMap.at(id);
95 LOG(VERBOSE) << "Invoking " << callbacksForId.size() << " callbacks for ID=" << id;
96 for (const TelemetryCallback& tc : callbacksForId) {
97 if (invokedCallbacks.find(tc) != invokedCallbacks.end()) {
98 // skipping already invoked callbacks
99 continue;
100 }
101 invokedCallbacks.insert(tc);
102 ndk::ScopedAStatus status =
103 tc.callback->onChange(findCarDataIdsIntersection(tc.config.carDataIds));
104 if (status.getExceptionCode() == EX_TRANSACTION_FAILED &&
105 status.getStatus() == STATUS_DEAD_OBJECT) {
106 LOG(WARNING) << "Failed to invoke onChange() on a dead object, removing callback";
107 removeCallback(tc.callback);
108 }
109 }
110 }
111 }
112
removeCarDataIds(const std::vector<int32_t> & ids)113 void TelemetryServer::removeCarDataIds(const std::vector<int32_t>& ids) {
114 const std::scoped_lock<std::mutex> lock(mMutex);
115 for (int32_t id : ids) {
116 mCarDataIds.erase(id);
117 }
118 std::unordered_set<TelemetryCallback, TelemetryCallback::HashFunction> invokedCallbacks;
119 LOG(VERBOSE) << "Received removeCarDataIds call from CarTelemetryService, notifying callbacks";
120 for (int32_t id : ids) {
121 if (mIdToCallbacksMap.find(id) == mIdToCallbacksMap.end()) {
122 // prevent out of range exception when calling unordered_map.at()
123 continue;
124 }
125 const auto& callbacksForId = mIdToCallbacksMap.at(id);
126 LOG(VERBOSE) << "Invoking " << callbacksForId.size() << " callbacks for ID=" << id;
127 for (const TelemetryCallback& tc : callbacksForId) {
128 if (invokedCallbacks.find(tc) != invokedCallbacks.end()) {
129 // skipping already invoked callbacks
130 continue;
131 }
132 invokedCallbacks.insert(tc);
133 ndk::ScopedAStatus status =
134 tc.callback->onChange(findCarDataIdsIntersection(tc.config.carDataIds));
135 if (status.getExceptionCode() == EX_TRANSACTION_FAILED &&
136 status.getStatus() == STATUS_DEAD_OBJECT) {
137 LOG(WARNING) << "Failed to invoke onChange() on a dead object, removing callback";
138 removeCallback(tc.callback);
139 }
140 }
141 }
142 }
143
getListener()144 std::shared_ptr<ICarDataListener> TelemetryServer::getListener() {
145 const std::scoped_lock<std::mutex> lock(mMutex);
146 return mCarDataListener;
147 }
148
dump(int fd)149 void TelemetryServer::dump(int fd) {
150 const std::scoped_lock<std::mutex> lock(mMutex);
151 dprintf(fd, " TelemetryServer:\n");
152 mRingBuffer.dump(fd);
153 }
154
addCallback(const CallbackConfig & config,const std::shared_ptr<ICarTelemetryCallback> & callback)155 Result<void> TelemetryServer::addCallback(const CallbackConfig& config,
156 const std::shared_ptr<ICarTelemetryCallback>& callback) {
157 const std::scoped_lock<std::mutex> lock(mMutex);
158 TelemetryCallback cb(config, callback);
159 if (mCallbacks.find(cb) != mCallbacks.end()) {
160 const std::string msg = "The ICarTelemetryCallback already exists. "
161 "Use removeCarTelemetryCallback() to remove it first";
162 LOG(WARNING) << msg;
163 return Error(EX_ILLEGAL_ARGUMENT) << msg;
164 }
165
166 mCallbacks.insert(cb);
167
168 // link each interested CarData ID with the new callback
169 for (int32_t id : config.carDataIds) {
170 if (mIdToCallbacksMap.find(id) == mIdToCallbacksMap.end()) {
171 mIdToCallbacksMap[id] =
172 std::unordered_set<TelemetryCallback, TelemetryCallback::HashFunction>{cb};
173 } else {
174 mIdToCallbacksMap.at(id).insert(cb);
175 }
176 LOG(VERBOSE) << "CarData ID=" << id << " has " << mIdToCallbacksMap.at(id).size()
177 << " associated callbacks";
178 }
179
180 std::vector<int32_t> interestedIds = findCarDataIdsIntersection(config.carDataIds);
181 if (interestedIds.size() == 0) {
182 return {};
183 }
184 LOG(VERBOSE) << "Notifying new callback with active CarData IDs";
185 ndk::ScopedAStatus status = callback->onChange(interestedIds);
186 if (status.getExceptionCode() == EX_TRANSACTION_FAILED &&
187 status.getStatus() == STATUS_DEAD_OBJECT) {
188 removeCallback(callback);
189 return Error(EX_ILLEGAL_ARGUMENT)
190 << "Failed to invoke onChange() on a dead object, removing callback";
191 }
192 return {};
193 }
194
removeCallback(const std::shared_ptr<ICarTelemetryCallback> & callback)195 Result<void> TelemetryServer::removeCallback(
196 const std::shared_ptr<ICarTelemetryCallback>& callback) {
197 const std::scoped_lock<std::mutex> lock(mMutex);
198 auto it = mCallbacks.find(TelemetryCallback(callback));
199 if (it == mCallbacks.end()) {
200 constexpr char msg[] = "Attempting to remove a CarTelemetryCallback that does not exist";
201 LOG(WARNING) << msg;
202 return Error(EX_ILLEGAL_ARGUMENT) << msg;
203 }
204
205 const TelemetryCallback& tc = *it;
206 // unlink callback from ID in the mIdToCallbacksMap
207 for (int32_t id : tc.config.carDataIds) {
208 if (mIdToCallbacksMap.find(id) == mIdToCallbacksMap.end()) {
209 LOG(ERROR) << "The callback is not linked to its interested IDs.";
210 continue;
211 }
212 auto& associatedCallbacks = mIdToCallbacksMap.at(id);
213 auto associatedCallbackIterator = associatedCallbacks.find(tc);
214 if (associatedCallbackIterator == associatedCallbacks.end()) {
215 continue;
216 }
217 associatedCallbacks.erase(associatedCallbackIterator);
218 LOG(VERBOSE) << "After unlinking a callback from ID=" << id << ", the ID has "
219 << mIdToCallbacksMap.at(id).size() << " associated callbacks";
220 if (associatedCallbacks.size() == 0) {
221 mIdToCallbacksMap.erase(id);
222 }
223 }
224
225 mCallbacks.erase(it);
226 LOG(VERBOSE) << "After removeCallback, there are " << mCallbacks.size()
227 << " callbacks in cartelemetryd";
228 return {};
229 }
230
writeCarData(const std::vector<CarData> & dataList,uid_t publisherUid)231 void TelemetryServer::writeCarData(const std::vector<CarData>& dataList, uid_t publisherUid) {
232 const std::scoped_lock<std::mutex> lock(mMutex);
233 bool bufferWasEmptyBefore = mRingBuffer.size() == 0;
234 for (auto&& data : dataList) {
235 // ignore data that has no subscribers in CarTelemetryService
236 if (mCarDataIds.find(data.id) == mCarDataIds.end()) {
237 LOG(VERBOSE) << "Ignoring CarData with ID=" << data.id;
238 continue;
239 }
240 mRingBuffer.push({data.id, data.content, publisherUid});
241 }
242 // If the mRingBuffer was not empty, the message is already scheduled. It prevents scheduling
243 // too many unnecessary idendical messages in the looper.
244 if (mCarDataListener != nullptr && bufferWasEmptyBefore && mRingBuffer.size() > 0) {
245 mLooper->sendMessageDelayed(mPushCarDataDelayNs.count(), mMessageHandler,
246 kMsgPushCarDataToListener);
247 }
248 }
249
250 // Runs on the main thread.
pushCarDataToListeners()251 void TelemetryServer::pushCarDataToListeners() {
252 std::vector<CarDataInternal> pendingCarDataInternals;
253 {
254 const std::scoped_lock<std::mutex> lock(mMutex);
255 // Remove extra messages.
256 mLooper->removeMessages(mMessageHandler, kMsgPushCarDataToListener);
257 if (mCarDataListener == nullptr || mRingBuffer.size() == 0) {
258 return;
259 }
260 // Push elements to pendingCarDataInternals in reverse order so we can send data
261 // from the back of the pendingCarDataInternals vector.
262 while (mRingBuffer.size() > 0) {
263 auto carData = std::move(mRingBuffer.popBack());
264 CarDataInternal data;
265 data.id = carData.mId;
266 data.content = std::move(carData.mContent);
267 pendingCarDataInternals.push_back(data);
268 }
269 }
270
271 // TODO(b/186477983): send data in batch to improve performance, but careful sending too
272 // many data at once, as it could clog the Binder - it has <1MB limit.
273 while (!pendingCarDataInternals.empty()) {
274 ndk::ScopedAStatus status = ndk::ScopedAStatus::ok();
275 {
276 const std::scoped_lock<std::mutex> lock(mMutex);
277 if (mCarDataListener != nullptr) {
278 status = mCarDataListener->onCarDataReceived({pendingCarDataInternals.back()});
279 } else {
280 status = ndk::ScopedAStatus::
281 fromServiceSpecificErrorWithMessage(EX_NULL_POINTER,
282 "mCarDataListener is currently set to "
283 "null, will try again.");
284 }
285 }
286 if (!status.isOk()) {
287 LOG(WARNING) << "Failed to push CarDataInternal, will try again. Status: "
288 << status.getStatus()
289 << ", service-specific error: " << status.getServiceSpecificError()
290 << ", message: " << status.getMessage()
291 << ", exception code: " << status.getExceptionCode()
292 << ", description: " << status.getDescription();
293 sleep(kPushCarDataFailureDelaySeconds.count());
294 } else {
295 pendingCarDataInternals.pop_back();
296 }
297 }
298 }
299
MessageHandlerImpl(TelemetryServer * server)300 TelemetryServer::MessageHandlerImpl::MessageHandlerImpl(TelemetryServer* server) :
301 mTelemetryServer(server) {}
302
handleMessage(const Message & message)303 void TelemetryServer::MessageHandlerImpl::handleMessage(const Message& message) {
304 switch (message.what) {
305 case kMsgPushCarDataToListener:
306 mTelemetryServer->pushCarDataToListeners();
307 break;
308 default:
309 LOG(WARNING) << "Unknown message: " << message.what;
310 }
311 }
312
313 } // namespace telemetry
314 } // namespace automotive
315 } // namespace android
316