1 /* 2 * Copyright (C) 2021 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.android.car.telemetry.publisher; 18 19 import android.annotation.NonNull; 20 import android.automotive.telemetry.internal.CarDataInternal; 21 import android.automotive.telemetry.internal.ICarDataListener; 22 import android.automotive.telemetry.internal.ICarTelemetryInternal; 23 import android.car.builtin.os.ServiceManagerHelper; 24 import android.car.builtin.util.Slogf; 25 import android.car.telemetry.TelemetryProto; 26 import android.os.Handler; 27 import android.os.IBinder; 28 import android.os.PersistableBundle; 29 import android.os.RemoteException; 30 import android.util.ArraySet; 31 import android.util.SparseArray; 32 33 import com.android.automotive.telemetry.CarDataProto; 34 import com.android.car.CarLog; 35 import com.android.car.telemetry.databroker.DataSubscriber; 36 import com.android.car.telemetry.sessioncontroller.SessionAnnotation; 37 import com.android.car.telemetry.sessioncontroller.SessionController; 38 import com.android.internal.annotations.VisibleForTesting; 39 import com.android.internal.util.Preconditions; 40 41 import java.nio.charset.StandardCharsets; 42 import java.util.ArrayList; 43 import java.util.Iterator; 44 45 /** 46 * Publisher for cartelemtryd service (aka ICarTelemetry). 47 * 48 * <p>When a subscriber is added, the publisher binds to ICarTelemetryInternal and starts listening 49 * for incoming CarData. The matching CarData will be pushed to the subscriber. It unbinds itself 50 * from ICarTelemetryInternal if there are no subscribers. 51 * 52 * <p>See {@code packages/services/Car/cpp/telemetry/cartelemetryd} to learn more about the service. 53 */ 54 public class CarTelemetrydPublisher extends AbstractPublisher { 55 private static final boolean DEBUG = false; // STOPSHIP if true 56 private static final String SERVICE_NAME = ICarTelemetryInternal.DESCRIPTOR + "/default"; 57 private static final int BINDER_FLAGS = 0; 58 59 private final SparseArray<ArrayList<DataSubscriber>> mCarIdSubscriberLookUp = 60 new SparseArray<>(); 61 // All the methods in this class are expected to be called on this handler's thread. 62 private final Handler mTelemetryHandler; 63 private final SessionController mSessionController; 64 private final ICarDataListener mListener = new ICarDataListener.Stub() { 65 @Override 66 public void onCarDataReceived( 67 @NonNull final CarDataInternal[] dataList) throws RemoteException { 68 if (DEBUG) { 69 Slogf.d(CarLog.TAG_TELEMETRY, 70 "Received " + dataList.length + " CarData from cartelemetryd"); 71 } 72 // TODO(b/189142577): Create custom Handler and post message to improve performance 73 mTelemetryHandler.post(() -> onCarDataListReceived(dataList)); 74 } 75 76 @Override 77 public String getInterfaceHash() { 78 return ICarDataListener.HASH; 79 } 80 81 @Override 82 public int getInterfaceVersion() { 83 return ICarDataListener.VERSION; 84 } 85 }; 86 private final IBinder.DeathRecipient mDeathRecipient = this::onBinderDied; 87 88 private ICarTelemetryInternal mCarTelemetryInternal; 89 CarTelemetrydPublisher( @onNull PublisherListener listener, @NonNull Handler telemetryHandler, @NonNull SessionController sessionController)90 CarTelemetrydPublisher( 91 @NonNull PublisherListener listener, @NonNull Handler telemetryHandler, 92 @NonNull SessionController sessionController) { 93 super(listener); 94 this.mTelemetryHandler = telemetryHandler; 95 this.mSessionController = sessionController; 96 } 97 98 /** Called when binder for ICarTelemetry service is died. */ onBinderDied()99 private void onBinderDied() { 100 // TODO(b/189142577): Create custom Handler and post message to improve performance 101 mTelemetryHandler.post(() -> { 102 if (mCarTelemetryInternal != null) { 103 mCarTelemetryInternal.asBinder().unlinkToDeath(mDeathRecipient, BINDER_FLAGS); 104 mCarTelemetryInternal = null; 105 } 106 // TODO(b/241441036): Revisit actions taken when the binder dies. 107 onPublisherFailure( 108 getMetricsConfigs(), 109 new IllegalStateException("ICarTelemetryInternal binder died")); 110 }); 111 } 112 113 /** 114 * Connects to ICarTelemetryInternal service and starts listening for CarData. 115 * 116 * @return true for success or if cartelemetryd is already connected, false otherwise. 117 */ connectToCarTelemetryd()118 private boolean connectToCarTelemetryd() { 119 if (mCarTelemetryInternal != null) { 120 return true; 121 } 122 IBinder binder = ServiceManagerHelper.checkService(SERVICE_NAME); 123 if (binder == null) { 124 onPublisherFailure( 125 getMetricsConfigs(), 126 new IllegalStateException( 127 "Failed to connect to the ICarTelemetryInternal: service is not " 128 + "ready")); 129 return false; 130 } 131 try { 132 binder.linkToDeath(mDeathRecipient, BINDER_FLAGS); 133 } catch (RemoteException e) { 134 onPublisherFailure( 135 getMetricsConfigs(), 136 new IllegalStateException( 137 "Failed to connect to the ICarTelemetryInternal: linkToDeath failed", 138 e)); 139 return false; 140 } 141 mCarTelemetryInternal = ICarTelemetryInternal.Stub.asInterface(binder); 142 try { 143 mCarTelemetryInternal.setListener(mListener); 144 } catch (RemoteException e) { 145 binder.unlinkToDeath(mDeathRecipient, BINDER_FLAGS); 146 mCarTelemetryInternal = null; 147 onPublisherFailure( 148 getMetricsConfigs(), 149 new IllegalStateException( 150 "Failed to connect to the ICarTelemetryInternal: Cannot set CarData " 151 + "listener", e)); 152 return false; 153 } 154 return true; 155 } 156 157 @NonNull getMetricsConfigs()158 private ArrayList<TelemetryProto.MetricsConfig> getMetricsConfigs() { 159 ArraySet<TelemetryProto.MetricsConfig> uniqueConfigs = 160 new ArraySet<TelemetryProto.MetricsConfig>(); 161 for (int i = 0; i < mCarIdSubscriberLookUp.size(); i++) { 162 ArrayList<DataSubscriber> subscribers = mCarIdSubscriberLookUp.valueAt(i); 163 for (int j = 0; j < subscribers.size(); j++) { 164 uniqueConfigs.add(subscribers.get(j).getMetricsConfig()); 165 } 166 } 167 ArrayList<TelemetryProto.MetricsConfig> allConfigs = 168 new ArrayList<TelemetryProto.MetricsConfig>(); 169 Iterator<TelemetryProto.MetricsConfig> iterator = uniqueConfigs.iterator(); 170 while (iterator.hasNext()) { 171 allConfigs.add(iterator.next()); 172 } 173 return allConfigs; 174 } 175 176 /** 177 * Disconnects from ICarTelemetryInternal service. 178 * 179 * @throws IllegalStateException if fails to clear the listener. 180 */ disconnectFromCarTelemetryd()181 private void disconnectFromCarTelemetryd() { 182 if (mCarTelemetryInternal == null) { 183 return; // already disconnected 184 } 185 try { 186 mCarTelemetryInternal.clearListener(); 187 } catch (RemoteException e) { 188 Slogf.w(CarLog.TAG_TELEMETRY, "Failed to remove ICarTelemetryInternal listener", e); 189 } 190 mCarTelemetryInternal.asBinder().unlinkToDeath(mDeathRecipient, BINDER_FLAGS); 191 mCarTelemetryInternal = null; 192 } 193 194 @VisibleForTesting isConnectedToCarTelemetryd()195 boolean isConnectedToCarTelemetryd() { 196 return mCarTelemetryInternal != null; 197 } 198 199 @Override addDataSubscriber(@onNull DataSubscriber subscriber)200 public void addDataSubscriber(@NonNull DataSubscriber subscriber) { 201 TelemetryProto.Publisher publisherParam = subscriber.getPublisherParam(); 202 Preconditions.checkArgument( 203 publisherParam.getPublisherCase() 204 == TelemetryProto.Publisher.PublisherCase.CARTELEMETRYD, 205 "Subscribers only with CarTelemetryd publisher are supported by this class."); 206 int carDataId = publisherParam.getCartelemetryd().getId(); 207 CarDataProto.CarData.PushedCase carDataCase = 208 CarDataProto.CarData.PushedCase.forNumber(carDataId); 209 // TODO(b/241249252): Revise the check to accommodate data in OEM ID range, 10K-20K. 210 Preconditions.checkArgument( 211 carDataCase != null 212 && carDataCase != CarDataProto.CarData.PushedCase.PUSHED_NOT_SET, 213 "Invalid CarData ID " + carDataId 214 + ". Please see CarData.proto for the list of available IDs."); 215 216 ArrayList<DataSubscriber> currentSubscribers = mCarIdSubscriberLookUp.get(carDataId); 217 if (currentSubscribers == null) { 218 currentSubscribers = new ArrayList<>(); 219 mCarIdSubscriberLookUp.put(carDataId, currentSubscribers); 220 } 221 currentSubscribers.add(subscriber); 222 223 if (!connectToCarTelemetryd()) { 224 // logging is done in connectToCarTelemetryd, do not double log here 225 return; 226 } 227 Slogf.d(CarLog.TAG_TELEMETRY, "Subscribing to CarData.id=%d", carDataId); 228 // No need to make a binder call if the given CarDataId is already subscribed to. 229 if (currentSubscribers.size() > 1) { 230 return; 231 } 232 233 try { 234 mCarTelemetryInternal.addCarDataIds(new int[]{carDataId}); 235 } catch (RemoteException e) { 236 onPublisherFailure( 237 getMetricsConfigs(), 238 new IllegalStateException( 239 "Failed to make addCarDataIds binder call to ICarTelemetryInternal " 240 + "for CarDataID = " 241 + carDataId, e)); 242 return; 243 } 244 } 245 246 @Override removeDataSubscriber(@onNull DataSubscriber subscriber)247 public void removeDataSubscriber(@NonNull DataSubscriber subscriber) { 248 int idToRemove = subscriber.getPublisherParam().getCartelemetryd().getId(); 249 // TODO(b/241251062): Revise to consider throwing IllegalArgumentException and checking 250 // for subscriber type like some other publisher implementations do. 251 ArrayList<DataSubscriber> currentSubscribers = mCarIdSubscriberLookUp.get(idToRemove); 252 if (currentSubscribers == null) { 253 // No subscribers were found for a given id. 254 Slogf.e(CarLog.TAG_TELEMETRY, 255 "Subscriber for CarData.id=%d is not present among subscriptions. This is not" 256 + " expected.", 257 idToRemove); 258 return; 259 } 260 currentSubscribers.remove(subscriber); 261 if (currentSubscribers.isEmpty()) { 262 mCarIdSubscriberLookUp.remove(idToRemove); 263 try { 264 mCarTelemetryInternal.removeCarDataIds(new int[]{idToRemove}); 265 } catch (RemoteException e) { 266 Slogf.e(CarLog.TAG_TELEMETRY, 267 "removeCarDataIds binder call failed for CarData.id=%d", idToRemove); 268 } 269 } 270 if (mCarIdSubscriberLookUp.size() == 0) { 271 disconnectFromCarTelemetryd(); 272 } 273 } 274 275 @Override removeAllDataSubscribers()276 public void removeAllDataSubscribers() { 277 int[] idsToRemove = new int[mCarIdSubscriberLookUp.size()]; 278 for (int index = 0; index < mCarIdSubscriberLookUp.size(); index++) { 279 idsToRemove[index] = mCarIdSubscriberLookUp.keyAt(index); 280 } 281 try { 282 mCarTelemetryInternal.removeCarDataIds(idsToRemove); 283 } catch (RemoteException e) { 284 Slogf.e(CarLog.TAG_TELEMETRY, 285 "removeCarDataIds binder call failed while unsubscribing from all data."); 286 } 287 mCarIdSubscriberLookUp.clear(); 288 289 disconnectFromCarTelemetryd(); 290 } 291 292 @Override hasDataSubscriber(@onNull DataSubscriber subscriber)293 public boolean hasDataSubscriber(@NonNull DataSubscriber subscriber) { 294 int id = subscriber.getPublisherParam().getCartelemetryd().getId(); 295 return mCarIdSubscriberLookUp.contains(id) && mCarIdSubscriberLookUp.get(id).contains( 296 subscriber); 297 } 298 299 /** 300 * Called when publisher receives new car data list. It's executed on the telemetry thread. 301 */ onCarDataListReceived(@onNull CarDataInternal[] dataList)302 private void onCarDataListReceived(@NonNull CarDataInternal[] dataList) { 303 for (CarDataInternal data : dataList) { 304 processCarData(data); 305 } 306 } 307 processCarData(@onNull CarDataInternal dataItem)308 private void processCarData(@NonNull CarDataInternal dataItem) { 309 ArrayList<DataSubscriber> currentSubscribers = mCarIdSubscriberLookUp.get(dataItem.id); 310 if (currentSubscribers == null) { 311 // It is possible the carId is no longer subscribed to while data is in-flight. 312 return; 313 } 314 String content = new String(dataItem.content, StandardCharsets.UTF_8); 315 PersistableBundle bundle = new PersistableBundle(); 316 bundle.putInt(Constants.CAR_TELEMETRYD_BUNDLE_KEY_ID, dataItem.id); 317 bundle.putString(Constants.CAR_TELEMETRYD_BUNDLE_KEY_CONTENT, content); 318 SessionAnnotation sessionAnnotation = mSessionController.getSessionAnnotation(); 319 sessionAnnotation.addAnnotationsToBundle(bundle); 320 for (int i = 0; i < currentSubscribers.size(); i++) { 321 currentSubscribers.get(i).push(bundle, 322 content.length() > DataSubscriber.SCRIPT_INPUT_SIZE_THRESHOLD_BYTES); 323 } 324 } 325 326 @Override handleSessionStateChange(SessionAnnotation annotation)327 protected void handleSessionStateChange(SessionAnnotation annotation) { 328 // We don't handle session state changes. We make synchronous calls to SessionController 329 // as soon as new data arrives to retrieve the current session annotations. 330 // Make sure to invoke sessionController.registerCallback(this::handleSessionStateChange) 331 // in the constructor once this method is implemented. 332 } 333 334 335 } 336