1 /*
2  * Copyright (C) 2021 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.car.telemetry.publisher;
18 
19 import android.annotation.NonNull;
20 import android.automotive.telemetry.internal.CarDataInternal;
21 import android.automotive.telemetry.internal.ICarDataListener;
22 import android.automotive.telemetry.internal.ICarTelemetryInternal;
23 import android.car.builtin.os.ServiceManagerHelper;
24 import android.car.builtin.util.Slogf;
25 import android.car.telemetry.TelemetryProto;
26 import android.os.Handler;
27 import android.os.IBinder;
28 import android.os.PersistableBundle;
29 import android.os.RemoteException;
30 import android.util.ArraySet;
31 import android.util.SparseArray;
32 
33 import com.android.automotive.telemetry.CarDataProto;
34 import com.android.car.CarLog;
35 import com.android.car.telemetry.databroker.DataSubscriber;
36 import com.android.car.telemetry.sessioncontroller.SessionAnnotation;
37 import com.android.car.telemetry.sessioncontroller.SessionController;
38 import com.android.internal.annotations.VisibleForTesting;
39 import com.android.internal.util.Preconditions;
40 
41 import java.nio.charset.StandardCharsets;
42 import java.util.ArrayList;
43 import java.util.Iterator;
44 
45 /**
46  * Publisher for cartelemtryd service (aka ICarTelemetry).
47  *
48  * <p>When a subscriber is added, the publisher binds to ICarTelemetryInternal and starts listening
49  * for incoming CarData. The matching CarData will be pushed to the subscriber. It unbinds itself
50  * from ICarTelemetryInternal if there are no subscribers.
51  *
52  * <p>See {@code packages/services/Car/cpp/telemetry/cartelemetryd} to learn more about the service.
53  */
54 public class CarTelemetrydPublisher extends AbstractPublisher {
55     private static final boolean DEBUG = false;  // STOPSHIP if true
56     private static final String SERVICE_NAME = ICarTelemetryInternal.DESCRIPTOR + "/default";
57     private static final int BINDER_FLAGS = 0;
58 
59     private final SparseArray<ArrayList<DataSubscriber>> mCarIdSubscriberLookUp =
60             new SparseArray<>();
61     // All the methods in this class are expected to be called on this handler's thread.
62     private final Handler mTelemetryHandler;
63     private final SessionController mSessionController;
64     private final ICarDataListener mListener = new ICarDataListener.Stub() {
65         @Override
66         public void onCarDataReceived(
67                 @NonNull final CarDataInternal[] dataList) throws RemoteException {
68             if (DEBUG) {
69                 Slogf.d(CarLog.TAG_TELEMETRY,
70                         "Received " + dataList.length + " CarData from cartelemetryd");
71             }
72             // TODO(b/189142577): Create custom Handler and post message to improve performance
73             mTelemetryHandler.post(() -> onCarDataListReceived(dataList));
74         }
75 
76         @Override
77         public String getInterfaceHash() {
78             return ICarDataListener.HASH;
79         }
80 
81         @Override
82         public int getInterfaceVersion() {
83             return ICarDataListener.VERSION;
84         }
85     };
86     private final IBinder.DeathRecipient mDeathRecipient = this::onBinderDied;
87 
88     private ICarTelemetryInternal mCarTelemetryInternal;
89 
CarTelemetrydPublisher( @onNull PublisherListener listener, @NonNull Handler telemetryHandler, @NonNull SessionController sessionController)90     CarTelemetrydPublisher(
91             @NonNull PublisherListener listener, @NonNull Handler telemetryHandler,
92             @NonNull SessionController sessionController) {
93         super(listener);
94         this.mTelemetryHandler = telemetryHandler;
95         this.mSessionController = sessionController;
96     }
97 
98     /** Called when binder for ICarTelemetry service is died. */
onBinderDied()99     private void onBinderDied() {
100         // TODO(b/189142577): Create custom Handler and post message to improve performance
101         mTelemetryHandler.post(() -> {
102             if (mCarTelemetryInternal != null) {
103                 mCarTelemetryInternal.asBinder().unlinkToDeath(mDeathRecipient, BINDER_FLAGS);
104                 mCarTelemetryInternal = null;
105             }
106             // TODO(b/241441036): Revisit actions taken when the binder dies.
107             onPublisherFailure(
108                     getMetricsConfigs(),
109                     new IllegalStateException("ICarTelemetryInternal binder died"));
110         });
111     }
112 
113     /**
114      * Connects to ICarTelemetryInternal service and starts listening for CarData.
115      *
116      * @return true for success or if cartelemetryd is already connected, false otherwise.
117      */
connectToCarTelemetryd()118     private boolean connectToCarTelemetryd() {
119         if (mCarTelemetryInternal != null) {
120             return true;
121         }
122         IBinder binder = ServiceManagerHelper.checkService(SERVICE_NAME);
123         if (binder == null) {
124             onPublisherFailure(
125                     getMetricsConfigs(),
126                     new IllegalStateException(
127                             "Failed to connect to the ICarTelemetryInternal: service is not "
128                                     + "ready"));
129             return false;
130         }
131         try {
132             binder.linkToDeath(mDeathRecipient, BINDER_FLAGS);
133         } catch (RemoteException e) {
134             onPublisherFailure(
135                     getMetricsConfigs(),
136                     new IllegalStateException(
137                             "Failed to connect to the ICarTelemetryInternal: linkToDeath failed",
138                             e));
139             return false;
140         }
141         mCarTelemetryInternal = ICarTelemetryInternal.Stub.asInterface(binder);
142         try {
143             mCarTelemetryInternal.setListener(mListener);
144         } catch (RemoteException e) {
145             binder.unlinkToDeath(mDeathRecipient, BINDER_FLAGS);
146             mCarTelemetryInternal = null;
147             onPublisherFailure(
148                     getMetricsConfigs(),
149                     new IllegalStateException(
150                             "Failed to connect to the ICarTelemetryInternal: Cannot set CarData "
151                                     + "listener", e));
152             return false;
153         }
154         return true;
155     }
156 
157     @NonNull
getMetricsConfigs()158     private ArrayList<TelemetryProto.MetricsConfig> getMetricsConfigs() {
159         ArraySet<TelemetryProto.MetricsConfig> uniqueConfigs =
160                 new ArraySet<TelemetryProto.MetricsConfig>();
161         for (int i = 0; i < mCarIdSubscriberLookUp.size(); i++) {
162             ArrayList<DataSubscriber> subscribers = mCarIdSubscriberLookUp.valueAt(i);
163             for (int j = 0; j < subscribers.size(); j++) {
164                 uniqueConfigs.add(subscribers.get(j).getMetricsConfig());
165             }
166         }
167         ArrayList<TelemetryProto.MetricsConfig> allConfigs =
168                 new ArrayList<TelemetryProto.MetricsConfig>();
169         Iterator<TelemetryProto.MetricsConfig> iterator = uniqueConfigs.iterator();
170         while (iterator.hasNext()) {
171             allConfigs.add(iterator.next());
172         }
173         return allConfigs;
174     }
175 
176     /**
177      * Disconnects from ICarTelemetryInternal service.
178      *
179      * @throws IllegalStateException if fails to clear the listener.
180      */
disconnectFromCarTelemetryd()181     private void disconnectFromCarTelemetryd() {
182         if (mCarTelemetryInternal == null) {
183             return;  // already disconnected
184         }
185         try {
186             mCarTelemetryInternal.clearListener();
187         } catch (RemoteException e) {
188             Slogf.w(CarLog.TAG_TELEMETRY, "Failed to remove ICarTelemetryInternal listener", e);
189         }
190         mCarTelemetryInternal.asBinder().unlinkToDeath(mDeathRecipient, BINDER_FLAGS);
191         mCarTelemetryInternal = null;
192     }
193 
194     @VisibleForTesting
isConnectedToCarTelemetryd()195     boolean isConnectedToCarTelemetryd() {
196         return mCarTelemetryInternal != null;
197     }
198 
199     @Override
addDataSubscriber(@onNull DataSubscriber subscriber)200     public void addDataSubscriber(@NonNull DataSubscriber subscriber) {
201         TelemetryProto.Publisher publisherParam = subscriber.getPublisherParam();
202         Preconditions.checkArgument(
203                 publisherParam.getPublisherCase()
204                         == TelemetryProto.Publisher.PublisherCase.CARTELEMETRYD,
205                 "Subscribers only with CarTelemetryd publisher are supported by this class.");
206         int carDataId = publisherParam.getCartelemetryd().getId();
207         CarDataProto.CarData.PushedCase carDataCase =
208                 CarDataProto.CarData.PushedCase.forNumber(carDataId);
209         // TODO(b/241249252): Revise the check to accommodate data in OEM ID range, 10K-20K.
210         Preconditions.checkArgument(
211                 carDataCase != null
212                         && carDataCase != CarDataProto.CarData.PushedCase.PUSHED_NOT_SET,
213                 "Invalid CarData ID " + carDataId
214                         + ". Please see CarData.proto for the list of available IDs.");
215 
216         ArrayList<DataSubscriber> currentSubscribers = mCarIdSubscriberLookUp.get(carDataId);
217         if (currentSubscribers == null) {
218             currentSubscribers = new ArrayList<>();
219             mCarIdSubscriberLookUp.put(carDataId, currentSubscribers);
220         }
221         currentSubscribers.add(subscriber);
222 
223         if (!connectToCarTelemetryd()) {
224             // logging is done in connectToCarTelemetryd, do not double log here
225             return;
226         }
227         Slogf.d(CarLog.TAG_TELEMETRY, "Subscribing to CarData.id=%d", carDataId);
228         // No need to make a binder call if the given CarDataId is already subscribed to.
229         if (currentSubscribers.size() > 1) {
230             return;
231         }
232 
233         try {
234             mCarTelemetryInternal.addCarDataIds(new int[]{carDataId});
235         } catch (RemoteException e) {
236             onPublisherFailure(
237                     getMetricsConfigs(),
238                     new IllegalStateException(
239                             "Failed to make addCarDataIds binder call to ICarTelemetryInternal "
240                                     + "for CarDataID = "
241                                     + carDataId, e));
242             return;
243         }
244     }
245 
246     @Override
removeDataSubscriber(@onNull DataSubscriber subscriber)247     public void removeDataSubscriber(@NonNull DataSubscriber subscriber) {
248         int idToRemove = subscriber.getPublisherParam().getCartelemetryd().getId();
249         // TODO(b/241251062): Revise to consider throwing IllegalArgumentException and checking
250         //  for subscriber type like some other publisher implementations do.
251         ArrayList<DataSubscriber> currentSubscribers = mCarIdSubscriberLookUp.get(idToRemove);
252         if (currentSubscribers == null) {
253             // No subscribers were found for a given id.
254             Slogf.e(CarLog.TAG_TELEMETRY,
255                     "Subscriber for CarData.id=%d is not present among subscriptions. This is not"
256                             + " expected.",
257                     idToRemove);
258             return;
259         }
260         currentSubscribers.remove(subscriber);
261         if (currentSubscribers.isEmpty()) {
262             mCarIdSubscriberLookUp.remove(idToRemove);
263             try {
264                 mCarTelemetryInternal.removeCarDataIds(new int[]{idToRemove});
265             } catch (RemoteException e) {
266                 Slogf.e(CarLog.TAG_TELEMETRY,
267                         "removeCarDataIds binder call failed for CarData.id=%d", idToRemove);
268             }
269         }
270         if (mCarIdSubscriberLookUp.size() == 0) {
271             disconnectFromCarTelemetryd();
272         }
273     }
274 
275     @Override
removeAllDataSubscribers()276     public void removeAllDataSubscribers() {
277         int[] idsToRemove = new int[mCarIdSubscriberLookUp.size()];
278         for (int index = 0; index < mCarIdSubscriberLookUp.size(); index++) {
279             idsToRemove[index] = mCarIdSubscriberLookUp.keyAt(index);
280         }
281         try {
282             mCarTelemetryInternal.removeCarDataIds(idsToRemove);
283         } catch (RemoteException e) {
284             Slogf.e(CarLog.TAG_TELEMETRY,
285                     "removeCarDataIds binder call failed while unsubscribing from all data.");
286         }
287         mCarIdSubscriberLookUp.clear();
288 
289         disconnectFromCarTelemetryd();
290     }
291 
292     @Override
hasDataSubscriber(@onNull DataSubscriber subscriber)293     public boolean hasDataSubscriber(@NonNull DataSubscriber subscriber) {
294         int id = subscriber.getPublisherParam().getCartelemetryd().getId();
295         return mCarIdSubscriberLookUp.contains(id) && mCarIdSubscriberLookUp.get(id).contains(
296                 subscriber);
297     }
298 
299     /**
300      * Called when publisher receives new car data list. It's executed on the telemetry thread.
301      */
onCarDataListReceived(@onNull CarDataInternal[] dataList)302     private void onCarDataListReceived(@NonNull CarDataInternal[] dataList) {
303         for (CarDataInternal data : dataList) {
304             processCarData(data);
305         }
306     }
307 
processCarData(@onNull CarDataInternal dataItem)308     private void processCarData(@NonNull CarDataInternal dataItem) {
309         ArrayList<DataSubscriber> currentSubscribers = mCarIdSubscriberLookUp.get(dataItem.id);
310         if (currentSubscribers == null) {
311             // It is possible the carId is no longer subscribed to while data is in-flight.
312             return;
313         }
314         String content = new String(dataItem.content, StandardCharsets.UTF_8);
315         PersistableBundle bundle = new PersistableBundle();
316         bundle.putInt(Constants.CAR_TELEMETRYD_BUNDLE_KEY_ID, dataItem.id);
317         bundle.putString(Constants.CAR_TELEMETRYD_BUNDLE_KEY_CONTENT, content);
318         SessionAnnotation sessionAnnotation = mSessionController.getSessionAnnotation();
319         sessionAnnotation.addAnnotationsToBundle(bundle);
320         for (int i = 0; i < currentSubscribers.size(); i++) {
321             currentSubscribers.get(i).push(bundle,
322                     content.length() > DataSubscriber.SCRIPT_INPUT_SIZE_THRESHOLD_BYTES);
323         }
324     }
325 
326     @Override
handleSessionStateChange(SessionAnnotation annotation)327     protected void handleSessionStateChange(SessionAnnotation annotation) {
328         // We don't handle session state changes. We make synchronous calls to SessionController
329         // as soon as new data arrives to retrieve the current session annotations.
330         // Make sure to invoke sessionController.registerCallback(this::handleSessionStateChange)
331         // in the constructor once this method is implemented.
332     }
333 
334 
335 }
336