1 /*
2  * Copyright (c) 2021, The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "TelemetryServer.h"
18 
19 #include "CarTelemetryImpl.h"
20 #include "RingBuffer.h"
21 
22 #include <aidl/android/automotive/telemetry/internal/CarDataInternal.h>
23 #include <android-base/logging.h>
24 
25 #include <inttypes.h>  // for PRIu64 and friends
26 
27 #include <cstdint>
28 #include <memory>
29 
30 namespace android {
31 namespace automotive {
32 namespace telemetry {
33 
34 namespace {
35 
36 using ::aidl::android::automotive::telemetry::internal::CarDataInternal;
37 using ::aidl::android::automotive::telemetry::internal::ICarDataListener;
38 using ::aidl::android::frameworks::automotive::telemetry::CarData;
39 using ::aidl::android::frameworks::automotive::telemetry::ICarTelemetryCallback;
40 using ::android::base::Error;
41 using ::android::base::Result;
42 
43 constexpr int kMsgPushCarDataToListener = 1;
44 
45 // If ICarDataListener cannot accept data, the next push should be delayed little bit to allow
46 // the listener to recover.
47 constexpr const std::chrono::seconds kPushCarDataFailureDelaySeconds = 1s;
48 }  // namespace
49 
TelemetryServer(LooperWrapper * looper,const std::chrono::nanoseconds & pushCarDataDelayNs,const int maxBufferSize)50 TelemetryServer::TelemetryServer(LooperWrapper* looper,
51                                  const std::chrono::nanoseconds& pushCarDataDelayNs,
52                                  const int maxBufferSize) :
53       mLooper(looper),
54       mPushCarDataDelayNs(pushCarDataDelayNs),
55       mRingBuffer(maxBufferSize),
56       mMessageHandler(new MessageHandlerImpl(this)) {}
57 
setListener(const std::shared_ptr<ICarDataListener> & listener)58 void TelemetryServer::setListener(const std::shared_ptr<ICarDataListener>& listener) {
59     const std::scoped_lock<std::mutex> lock(mMutex);
60     mCarDataListener = listener;
61     mLooper->sendMessageDelayed(mPushCarDataDelayNs.count(), mMessageHandler,
62                                 kMsgPushCarDataToListener);
63 }
64 
clearListener()65 void TelemetryServer::clearListener() {
66     const std::scoped_lock<std::mutex> lock(mMutex);
67     if (mCarDataListener == nullptr) {
68         return;
69     }
70     mCarDataListener = nullptr;
71     mLooper->removeMessages(mMessageHandler, kMsgPushCarDataToListener);
72 }
73 
findCarDataIdsIntersection(const std::vector<int32_t> & ids)74 std::vector<int32_t> TelemetryServer::findCarDataIdsIntersection(const std::vector<int32_t>& ids) {
75     std::vector<int32_t> interestedIds;
76     for (int32_t id : ids) {
77         if (mCarDataIds.find(id) != mCarDataIds.end()) {
78             interestedIds.push_back(id);
79         }
80     }
81     return interestedIds;
82 }
83 
addCarDataIds(const std::vector<int32_t> & ids)84 void TelemetryServer::addCarDataIds(const std::vector<int32_t>& ids) {
85     const std::scoped_lock<std::mutex> lock(mMutex);
86     mCarDataIds.insert(ids.cbegin(), ids.cend());
87     std::unordered_set<TelemetryCallback, TelemetryCallback::HashFunction> invokedCallbacks;
88     LOG(VERBOSE) << "Received addCarDataIds call from CarTelemetryService, notifying callbacks";
89     for (int32_t id : ids) {
90         if (mIdToCallbacksMap.find(id) == mIdToCallbacksMap.end()) {
91             // prevent out of range exception when calling unordered_map.at()
92             continue;
93         }
94         const auto& callbacksForId = mIdToCallbacksMap.at(id);
95         LOG(VERBOSE) << "Invoking " << callbacksForId.size() << " callbacks for ID=" << id;
96         for (const TelemetryCallback& tc : callbacksForId) {
97             if (invokedCallbacks.find(tc) != invokedCallbacks.end()) {
98                 // skipping already invoked callbacks
99                 continue;
100             }
101             invokedCallbacks.insert(tc);
102             ndk::ScopedAStatus status =
103                     tc.callback->onChange(findCarDataIdsIntersection(tc.config.carDataIds));
104             if (status.getExceptionCode() == EX_TRANSACTION_FAILED &&
105                 status.getStatus() == STATUS_DEAD_OBJECT) {
106                 LOG(WARNING) << "Failed to invoke onChange() on a dead object, removing callback";
107                 removeCallback(tc.callback);
108             }
109         }
110     }
111 }
112 
removeCarDataIds(const std::vector<int32_t> & ids)113 void TelemetryServer::removeCarDataIds(const std::vector<int32_t>& ids) {
114     const std::scoped_lock<std::mutex> lock(mMutex);
115     for (int32_t id : ids) {
116         mCarDataIds.erase(id);
117     }
118     std::unordered_set<TelemetryCallback, TelemetryCallback::HashFunction> invokedCallbacks;
119     LOG(VERBOSE) << "Received removeCarDataIds call from CarTelemetryService, notifying callbacks";
120     for (int32_t id : ids) {
121         if (mIdToCallbacksMap.find(id) == mIdToCallbacksMap.end()) {
122             // prevent out of range exception when calling unordered_map.at()
123             continue;
124         }
125         const auto& callbacksForId = mIdToCallbacksMap.at(id);
126         LOG(VERBOSE) << "Invoking " << callbacksForId.size() << " callbacks for ID=" << id;
127         for (const TelemetryCallback& tc : callbacksForId) {
128             if (invokedCallbacks.find(tc) != invokedCallbacks.end()) {
129                 // skipping already invoked callbacks
130                 continue;
131             }
132             invokedCallbacks.insert(tc);
133             ndk::ScopedAStatus status =
134                     tc.callback->onChange(findCarDataIdsIntersection(tc.config.carDataIds));
135             if (status.getExceptionCode() == EX_TRANSACTION_FAILED &&
136                 status.getStatus() == STATUS_DEAD_OBJECT) {
137                 LOG(WARNING) << "Failed to invoke onChange() on a dead object, removing callback";
138                 removeCallback(tc.callback);
139             }
140         }
141     }
142 }
143 
getListener()144 std::shared_ptr<ICarDataListener> TelemetryServer::getListener() {
145     const std::scoped_lock<std::mutex> lock(mMutex);
146     return mCarDataListener;
147 }
148 
dump(int fd)149 void TelemetryServer::dump(int fd) {
150     const std::scoped_lock<std::mutex> lock(mMutex);
151     dprintf(fd, "  TelemetryServer:\n");
152     mRingBuffer.dump(fd);
153 }
154 
addCallback(const CallbackConfig & config,const std::shared_ptr<ICarTelemetryCallback> & callback)155 Result<void> TelemetryServer::addCallback(const CallbackConfig& config,
156                                           const std::shared_ptr<ICarTelemetryCallback>& callback) {
157     const std::scoped_lock<std::mutex> lock(mMutex);
158     TelemetryCallback cb(config, callback);
159     if (mCallbacks.find(cb) != mCallbacks.end()) {
160         const std::string msg = "The ICarTelemetryCallback already exists. "
161                                 "Use removeCarTelemetryCallback() to remove it first";
162         LOG(WARNING) << msg;
163         return Error(EX_ILLEGAL_ARGUMENT) << msg;
164     }
165 
166     mCallbacks.insert(cb);
167 
168     // link each interested CarData ID with the new callback
169     for (int32_t id : config.carDataIds) {
170         if (mIdToCallbacksMap.find(id) == mIdToCallbacksMap.end()) {
171             mIdToCallbacksMap[id] =
172                     std::unordered_set<TelemetryCallback, TelemetryCallback::HashFunction>{cb};
173         } else {
174             mIdToCallbacksMap.at(id).insert(cb);
175         }
176         LOG(VERBOSE) << "CarData ID=" << id << " has " << mIdToCallbacksMap.at(id).size()
177                      << " associated callbacks";
178     }
179 
180     std::vector<int32_t> interestedIds = findCarDataIdsIntersection(config.carDataIds);
181     if (interestedIds.size() == 0) {
182         return {};
183     }
184     LOG(VERBOSE) << "Notifying new callback with active CarData IDs";
185     ndk::ScopedAStatus status = callback->onChange(interestedIds);
186     if (status.getExceptionCode() == EX_TRANSACTION_FAILED &&
187         status.getStatus() == STATUS_DEAD_OBJECT) {
188         removeCallback(callback);
189         return Error(EX_ILLEGAL_ARGUMENT)
190                 << "Failed to invoke onChange() on a dead object, removing callback";
191     }
192     return {};
193 }
194 
removeCallback(const std::shared_ptr<ICarTelemetryCallback> & callback)195 Result<void> TelemetryServer::removeCallback(
196         const std::shared_ptr<ICarTelemetryCallback>& callback) {
197     const std::scoped_lock<std::mutex> lock(mMutex);
198     auto it = mCallbacks.find(TelemetryCallback(callback));
199     if (it == mCallbacks.end()) {
200         constexpr char msg[] = "Attempting to remove a CarTelemetryCallback that does not exist";
201         LOG(WARNING) << msg;
202         return Error(EX_ILLEGAL_ARGUMENT) << msg;
203     }
204 
205     const TelemetryCallback& tc = *it;
206     // unlink callback from ID in the mIdToCallbacksMap
207     for (int32_t id : tc.config.carDataIds) {
208         if (mIdToCallbacksMap.find(id) == mIdToCallbacksMap.end()) {
209             LOG(ERROR) << "The callback is not linked to its interested IDs.";
210             continue;
211         }
212         auto& associatedCallbacks = mIdToCallbacksMap.at(id);
213         auto associatedCallbackIterator = associatedCallbacks.find(tc);
214         if (associatedCallbackIterator == associatedCallbacks.end()) {
215             continue;
216         }
217         associatedCallbacks.erase(associatedCallbackIterator);
218         LOG(VERBOSE) << "After unlinking a callback from ID=" << id << ", the ID has "
219                      << mIdToCallbacksMap.at(id).size() << " associated callbacks";
220         if (associatedCallbacks.size() == 0) {
221             mIdToCallbacksMap.erase(id);
222         }
223     }
224 
225     mCallbacks.erase(it);
226     LOG(VERBOSE) << "After removeCallback, there are " << mCallbacks.size()
227                  << " callbacks in cartelemetryd";
228     return {};
229 }
230 
writeCarData(const std::vector<CarData> & dataList,uid_t publisherUid)231 void TelemetryServer::writeCarData(const std::vector<CarData>& dataList, uid_t publisherUid) {
232     const std::scoped_lock<std::mutex> lock(mMutex);
233     bool bufferWasEmptyBefore = mRingBuffer.size() == 0;
234     for (auto&& data : dataList) {
235         // ignore data that has no subscribers in CarTelemetryService
236         if (mCarDataIds.find(data.id) == mCarDataIds.end()) {
237             LOG(VERBOSE) << "Ignoring CarData with ID=" << data.id;
238             continue;
239         }
240         mRingBuffer.push({data.id, data.content, publisherUid});
241     }
242     // If the mRingBuffer was not empty, the message is already scheduled. It prevents scheduling
243     // too many unnecessary idendical messages in the looper.
244     if (mCarDataListener != nullptr && bufferWasEmptyBefore && mRingBuffer.size() > 0) {
245         mLooper->sendMessageDelayed(mPushCarDataDelayNs.count(), mMessageHandler,
246                                     kMsgPushCarDataToListener);
247     }
248 }
249 
250 // Runs on the main thread.
pushCarDataToListeners()251 void TelemetryServer::pushCarDataToListeners() {
252     std::vector<CarDataInternal> pendingCarDataInternals;
253     {
254         const std::scoped_lock<std::mutex> lock(mMutex);
255         // Remove extra messages.
256         mLooper->removeMessages(mMessageHandler, kMsgPushCarDataToListener);
257         if (mCarDataListener == nullptr || mRingBuffer.size() == 0) {
258             return;
259         }
260         // Push elements to pendingCarDataInternals in reverse order so we can send data
261         // from the back of the pendingCarDataInternals vector.
262         while (mRingBuffer.size() > 0) {
263             auto carData = std::move(mRingBuffer.popBack());
264             CarDataInternal data;
265             data.id = carData.mId;
266             data.content = std::move(carData.mContent);
267             pendingCarDataInternals.push_back(data);
268         }
269     }
270 
271     // TODO(b/186477983): send data in batch to improve performance, but careful sending too
272     //                    many data at once, as it could clog the Binder - it has <1MB limit.
273     while (!pendingCarDataInternals.empty()) {
274         ndk::ScopedAStatus status = ndk::ScopedAStatus::ok();
275         {
276             const std::scoped_lock<std::mutex> lock(mMutex);
277             if (mCarDataListener != nullptr) {
278                 status = mCarDataListener->onCarDataReceived({pendingCarDataInternals.back()});
279             } else {
280                 status = ndk::ScopedAStatus::
281                         fromServiceSpecificErrorWithMessage(EX_NULL_POINTER,
282                                                             "mCarDataListener is currently set to "
283                                                             "null, will try again.");
284             }
285         }
286         if (!status.isOk()) {
287             LOG(WARNING) << "Failed to push CarDataInternal, will try again. Status: "
288                          << status.getStatus()
289                          << ", service-specific error: " << status.getServiceSpecificError()
290                          << ", message: " << status.getMessage()
291                          << ", exception code: " << status.getExceptionCode()
292                          << ", description: " << status.getDescription();
293             sleep(kPushCarDataFailureDelaySeconds.count());
294         } else {
295             pendingCarDataInternals.pop_back();
296         }
297     }
298 }
299 
MessageHandlerImpl(TelemetryServer * server)300 TelemetryServer::MessageHandlerImpl::MessageHandlerImpl(TelemetryServer* server) :
301       mTelemetryServer(server) {}
302 
handleMessage(const Message & message)303 void TelemetryServer::MessageHandlerImpl::handleMessage(const Message& message) {
304     switch (message.what) {
305         case kMsgPushCarDataToListener:
306             mTelemetryServer->pushCarDataToListeners();
307             break;
308         default:
309             LOG(WARNING) << "Unknown message: " << message.what;
310     }
311 }
312 
313 }  // namespace telemetry
314 }  // namespace automotive
315 }  // namespace android
316