1 /* 2 * Copyright (C) 2021 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.android.car.telemetry.databroker; 18 19 import static com.android.car.telemetry.CarTelemetryService.DEBUG; 20 21 import android.annotation.NonNull; 22 import android.annotation.Nullable; 23 import android.car.builtin.util.Slogf; 24 import android.car.builtin.util.TimingsTraceLog; 25 import android.car.telemetry.TelemetryProto; 26 import android.car.telemetry.TelemetryProto.MetricsConfig; 27 import android.car.telemetry.TelemetryProto.TelemetryError; 28 import android.content.ComponentName; 29 import android.content.Context; 30 import android.content.Intent; 31 import android.content.ServiceConnection; 32 import android.content.pm.PackageInfo; 33 import android.content.pm.PackageManager; 34 import android.os.Handler; 35 import android.os.HandlerThread; 36 import android.os.IBinder; 37 import android.os.Looper; 38 import android.os.Message; 39 import android.os.ParcelFileDescriptor; 40 import android.os.PersistableBundle; 41 import android.os.RemoteException; 42 import android.os.UserHandle; 43 import android.util.ArrayMap; 44 import android.util.Log; 45 import android.util.SparseIntArray; 46 47 import com.android.car.CarLog; 48 import com.android.car.CarServiceUtils; 49 import com.android.car.internal.LargeParcelable; 50 import com.android.car.telemetry.CarTelemetryService; 51 import com.android.car.telemetry.ResultStore; 52 import com.android.car.telemetry.publisher.AbstractPublisher; 53 import com.android.car.telemetry.publisher.PublisherFactory; 54 import com.android.car.telemetry.scriptexecutorinterface.BundleList; 55 import com.android.car.telemetry.scriptexecutorinterface.IScriptExecutor; 56 import com.android.car.telemetry.scriptexecutorinterface.IScriptExecutorListener; 57 import com.android.car.telemetry.util.IoUtils; 58 import com.android.internal.annotations.VisibleForTesting; 59 60 import java.io.IOException; 61 import java.io.OutputStream; 62 import java.lang.ref.WeakReference; 63 import java.util.ArrayList; 64 import java.util.Iterator; 65 import java.util.List; 66 import java.util.concurrent.PriorityBlockingQueue; 67 68 /** 69 * Implementation of the data path component of CarTelemetryService. Forwards the published data 70 * from publishers to consumers subject to the Controller's decision. 71 * All methods should be called from the telemetry thread unless otherwise specified as thread-safe. 72 */ 73 public class DataBrokerImpl implements DataBroker { 74 75 @VisibleForTesting 76 static final int MSG_HANDLE_TASK = 1; 77 @VisibleForTesting 78 static final int MSG_BIND_TO_SCRIPT_EXECUTOR = 2; 79 @VisibleForTesting 80 static final int MSG_STOP_HANGING_SCRIPT = 3; 81 82 /** Bind to script executor 5 times before entering disabled state. */ 83 private static final int MAX_BIND_SCRIPT_EXECUTOR_ATTEMPTS = 5; 84 85 /** Maximum wait time for a script to finish. */ 86 private static final long MAX_SCRIPT_EXECUTION_TIME_MILLIS = 30_000L; // 30 seconds 87 88 private static final String[] SCRIPT_EXECUTOR_PACKAGE_CANDIDATES = 89 {"com.android.car.scriptexecutor", "com.google.android.car.scriptexecutor"}; 90 private static final String SCRIPT_EXECUTOR_CLASS = 91 "com.android.car.scriptexecutor.ScriptExecutor"; 92 93 private final Context mContext; 94 private final PublisherFactory mPublisherFactory; 95 private final ResultStore mResultStore; 96 private final ScriptExecutorListener mScriptExecutorListener; 97 private final HandlerThread mTelemetryThread = CarServiceUtils.getHandlerThread( 98 CarTelemetryService.class.getSimpleName()); 99 private final Handler mTelemetryHandler = new TaskHandler(mTelemetryThread.getLooper()); 100 101 /** Thread-safe priority queue for scheduling tasks. */ 102 private final PriorityBlockingQueue<ScriptExecutionTask> mTaskQueue = 103 new PriorityBlockingQueue<>(); 104 105 /** 106 * Index is the type of {@link TelemetryProto.Publisher}, value is the number of tasks pending 107 * script execution that are produced by that publisher. 108 */ 109 private final SparseIntArray mPublisherCountArray = new SparseIntArray(); 110 111 /** 112 * Maps MetricsConfig name to its subscriptions. This map is useful for removing MetricsConfigs. 113 */ 114 private final ArrayMap<String, List<DataSubscriber>> mSubscriptionMap = new ArrayMap<>(); 115 116 /** 117 * If something irrecoverable happened, DataBroker should enter into a disabled state to prevent 118 * doing futile work. 119 */ 120 private boolean mDisabled = false; 121 122 /** Current number of attempts to bind to ScriptExecutor. */ 123 private int mBindScriptExecutorAttempts = 0; 124 125 /** Priority of current system to determine if a {@link ScriptExecutionTask} can run. */ 126 private int mPriority = 1; 127 128 /** Waiting period between attempts to bind script executor. Can be shortened for tests. */ 129 @VisibleForTesting long mBindScriptExecutorDelayMillis = 3_000L; 130 131 /** 132 * Name of the {@link MetricsConfig} that is currently running. 133 * A non-null value indicates ScriptExecutor is currently running this config, which means 134 * DataBroker should not make another ScriptExecutor binder call. 135 */ 136 private String mCurrentMetricsConfigName; 137 private IScriptExecutor mScriptExecutor; 138 private DataBrokerListener mDataBrokerListener; 139 140 /** 141 * Used only for the purpose of tracking the duration of running a script. The duration 142 * starts before the ScriptExecutor binder call and ends when a status is returned via 143 * ScriptExecutorListener or when the binder call throws an exception. 144 */ 145 private TimingsTraceLog mScriptExecutionTraceLog; 146 147 private final ServiceConnection mServiceConnection = new ServiceConnection() { 148 @Override 149 public void onServiceConnected(ComponentName name, IBinder service) { 150 mTelemetryHandler.post(() -> { 151 mScriptExecutor = IScriptExecutor.Stub.asInterface(service); 152 scheduleNextTask(); 153 }); 154 } 155 156 @Override 157 public void onServiceDisconnected(ComponentName name) { 158 // TODO(b/198684473): clean up the state after script executor disconnects 159 mTelemetryHandler.post(() -> { 160 unbindScriptExecutor(); 161 }); 162 } 163 }; 164 165 private final AbstractPublisher.PublisherListener mPublisherListener = 166 new AbstractPublisher.PublisherListener() { 167 @Override 168 public void onPublisherFailure( 169 @NonNull List<TelemetryProto.MetricsConfig> affectedConfigs, 170 @Nullable Throwable error) { 171 Slogf.w(CarLog.TAG_TELEMETRY, "Publisher failed", error); 172 // when a publisher fails, construct an TelemetryError result and send to client 173 String stackTrace = null; 174 if (error != null) { 175 stackTrace = Log.getStackTraceString(error); 176 } 177 TelemetryError telemetryError = buildTelemetryError( 178 TelemetryError.ErrorType.PUBLISHER_FAILED, "Publisher failed", stackTrace); 179 for (TelemetryProto.MetricsConfig config : affectedConfigs) { 180 // this will remove the MetricsConfig and notify the client of result 181 mDataBrokerListener.onReportFinished(config.getName(), telemetryError); 182 } 183 } 184 185 @Override 186 public void onConfigFinished(@NonNull TelemetryProto.MetricsConfig metricsConfig) { 187 String configName = metricsConfig.getName(); 188 Slogf.i(CarLog.TAG_TELEMETRY, 189 "Publisher sets MetricsConfig(" + configName + ") as finished"); 190 mDataBrokerListener.onReportFinished(configName); 191 } 192 }; 193 DataBrokerImpl( @onNull Context context, @NonNull PublisherFactory publisherFactory, @NonNull ResultStore resultStore, @NonNull TimingsTraceLog traceLog)194 public DataBrokerImpl( 195 @NonNull Context context, 196 @NonNull PublisherFactory publisherFactory, 197 @NonNull ResultStore resultStore, 198 @NonNull TimingsTraceLog traceLog) { 199 mContext = context; 200 mPublisherFactory = publisherFactory; 201 mResultStore = resultStore; 202 mScriptExecutorListener = new ScriptExecutorListener(this); 203 mPublisherFactory.initialize(mPublisherListener); 204 mScriptExecutionTraceLog = traceLog; 205 } 206 207 @Nullable findExecutorPackage()208 private String findExecutorPackage() { 209 PackageInfo info = null; 210 for (int i = 0; i < SCRIPT_EXECUTOR_PACKAGE_CANDIDATES.length; i++) { 211 try { 212 info = mContext.getPackageManager().getPackageInfo( 213 SCRIPT_EXECUTOR_PACKAGE_CANDIDATES[i], /* flags= */ 0); 214 if (info != null) { 215 break; 216 } 217 } catch (PackageManager.NameNotFoundException e) { 218 // ignore 219 } 220 } 221 if (info == null) { 222 return null; 223 } 224 return info.packageName; 225 } 226 bindScriptExecutor()227 private void bindScriptExecutor() { 228 // do not re-bind if broker is in a disabled state or if script executor is nonnull 229 if (mDisabled || mScriptExecutor != null) { 230 return; 231 } 232 String executorPackage = findExecutorPackage(); 233 if (executorPackage == null) { 234 Slogf.w(CarLog.TAG_TELEMETRY, "Cannot find executor package"); 235 return; 236 } 237 Intent intent = new Intent(); 238 intent.setComponent(new ComponentName(executorPackage, SCRIPT_EXECUTOR_CLASS)); 239 boolean success = mContext.bindServiceAsUser( 240 intent, 241 mServiceConnection, 242 Context.BIND_AUTO_CREATE, 243 UserHandle.SYSTEM); 244 if (success) { 245 mBindScriptExecutorAttempts = 0; // reset 246 return; 247 } 248 unbindScriptExecutor(); 249 mBindScriptExecutorAttempts++; 250 if (mBindScriptExecutorAttempts < MAX_BIND_SCRIPT_EXECUTOR_ATTEMPTS) { 251 Slogf.w(CarLog.TAG_TELEMETRY, 252 "failed to get valid connection to ScriptExecutor, retrying in " 253 + mBindScriptExecutorDelayMillis + "ms."); 254 mTelemetryHandler.sendEmptyMessageDelayed(MSG_BIND_TO_SCRIPT_EXECUTOR, 255 mBindScriptExecutorDelayMillis); 256 } else { 257 Slogf.w(CarLog.TAG_TELEMETRY, "failed to get valid connection to ScriptExecutor, " 258 + "disabling DataBroker"); 259 disableBroker(); 260 } 261 } 262 263 /** 264 * Unbinds {@link ScriptExecutor} to release the connection. This method should be called from 265 * the telemetry thread. 266 */ unbindScriptExecutor()267 private void unbindScriptExecutor() { 268 // TODO(b/198648763): unbind from script executor when there is no work to do 269 // if a script is running while we unbind from ScriptExecutor, end trace log first 270 if (mCurrentMetricsConfigName != null) { 271 mScriptExecutionTraceLog.traceEnd(); 272 mCurrentMetricsConfigName = null; 273 } 274 mScriptExecutor = null; 275 try { 276 mContext.unbindService(mServiceConnection); 277 } catch (IllegalArgumentException e) { 278 // If ScriptExecutor is gone before unbinding, it will throw this exception 279 Slogf.w(CarLog.TAG_TELEMETRY, "Failed to unbind from ScriptExecutor", e); 280 } 281 } 282 283 /** 284 * Enters into a disabled state because something irrecoverable happened. 285 * TODO(b/200841260): expose the state to the caller. 286 */ disableBroker()287 private void disableBroker() { 288 mDisabled = true; 289 // remove all MetricConfigs, disable all publishers, stop receiving data 290 List<String> keysToRemove = new ArrayList<>(); 291 for (String configName : mSubscriptionMap.keySet()) { 292 // get the metrics config from the DataSubscriber and store in a intermediate 293 // collection. It is not safe to modify a collection while iterating on it. 294 if (mSubscriptionMap.get(configName).size() != 0) { 295 keysToRemove.add(configName); 296 } 297 } 298 // remove metrics configurations 299 for (String configName : keysToRemove) { 300 removeMetricsConfig(configName); 301 } 302 mSubscriptionMap.clear(); 303 } 304 305 @Override addMetricsConfig( @onNull String metricsConfigName, @NonNull MetricsConfig metricsConfig)306 public void addMetricsConfig( 307 @NonNull String metricsConfigName, @NonNull MetricsConfig metricsConfig) { 308 // TODO(b/187743369): pass status back to caller 309 // if broker is disabled or metricsConfig already exists, do nothing 310 if (mDisabled || mSubscriptionMap.containsKey(metricsConfigName)) { 311 return; 312 } 313 // Create the subscribers for this metrics configuration 314 List<DataSubscriber> dataSubscribers = new ArrayList<>( 315 metricsConfig.getSubscribersList().size()); 316 for (TelemetryProto.Subscriber subscriber : metricsConfig.getSubscribersList()) { 317 if (subscriber.getPriority() < 0) { 318 throw new IllegalArgumentException("Subscribers must have non-negative priority"); 319 } 320 // protobuf publisher to a concrete Publisher 321 AbstractPublisher publisher = mPublisherFactory.getPublisher( 322 subscriber.getPublisher().getPublisherCase()); 323 // create DataSubscriber from TelemetryProto.Subscriber 324 DataSubscriber dataSubscriber = new DataSubscriber( 325 this, 326 metricsConfig, 327 subscriber); 328 dataSubscribers.add(dataSubscriber); 329 // addDataSubscriber could throw an exception, let CarTelemetryService handle it 330 publisher.addDataSubscriber(dataSubscriber); 331 } 332 mSubscriptionMap.put(metricsConfigName, dataSubscribers); 333 } 334 335 @Override removeMetricsConfig(@onNull String metricsConfigName)336 public void removeMetricsConfig(@NonNull String metricsConfigName) { 337 // TODO(b/187743369): pass status back to caller 338 if (!mSubscriptionMap.containsKey(metricsConfigName)) { 339 return; 340 } 341 // get the subscriptions associated with this MetricsConfig, remove it from the map 342 List<DataSubscriber> dataSubscribers = mSubscriptionMap.remove(metricsConfigName); 343 // for each subscriber, remove it from publishers 344 for (DataSubscriber subscriber : dataSubscribers) { 345 AbstractPublisher publisher = mPublisherFactory.getPublisher( 346 subscriber.getPublisherParam().getPublisherCase()); 347 try { 348 publisher.removeDataSubscriber(subscriber); 349 } catch (IllegalArgumentException e) { 350 // It shouldn't happen, but if happens, let's just log it. 351 Slogf.w(CarLog.TAG_TELEMETRY, "Failed to remove subscriber from publisher", e); 352 } 353 } 354 // Remove all the tasks associated with this metrics config. The underlying impl uses the 355 // weakly consistent iterator, which is thread-safe but does not freeze the collection while 356 // iterating, so it may or may not reflect any updates since the iterator was created. 357 // But since adding & polling from queue should happen in the same thread, the task queue 358 // should not be changed while tasks are being iterated and removed. 359 Iterator<ScriptExecutionTask> it = mTaskQueue.iterator(); 360 while (it.hasNext()) { 361 ScriptExecutionTask task = it.next(); 362 if (task.isAssociatedWithMetricsConfig(metricsConfigName)) { 363 mTaskQueue.remove(task); 364 mPublisherCountArray.append( 365 task.getPublisherType(), 366 mPublisherCountArray.get(task.getPublisherType()) - 1); 367 } 368 } 369 } 370 371 @Override removeAllMetricsConfigs()372 public void removeAllMetricsConfigs() { 373 mPublisherFactory.removeAllDataSubscribers(); 374 mSubscriptionMap.clear(); 375 mTaskQueue.clear(); 376 mPublisherCountArray.clear(); 377 } 378 379 @Override addTaskToQueue(@onNull ScriptExecutionTask task)380 public int addTaskToQueue(@NonNull ScriptExecutionTask task) { 381 if (mDisabled) { 382 return mPublisherCountArray.get(task.getPublisherType()); 383 } 384 mTaskQueue.add(task); 385 mPublisherCountArray.append( 386 task.getPublisherType(), 387 mPublisherCountArray.get(task.getPublisherType()) + 1); 388 scheduleNextTask(); 389 return mPublisherCountArray.get(task.getPublisherType()); 390 } 391 392 /** 393 * This method can be called from any thread. 394 * It is possible for this method to be invoked from different threads at the same time, but 395 * it is not possible to schedule the same task twice, because the handler handles message 396 * in the order they come in, this means the task will be polled sequentially instead of 397 * concurrently. Every task that is scheduled and run will be distinct. 398 * TODO(b/187743369): If the threading behavior in DataSubscriber changes, ScriptExecutionTask 399 * will also have different threading behavior. Update javadoc when the behavior is decided. 400 */ 401 @Override scheduleNextTask()402 public void scheduleNextTask() { 403 if (mDisabled || mTelemetryHandler.hasMessages(MSG_HANDLE_TASK)) { 404 return; 405 } 406 mTelemetryHandler.sendEmptyMessage(MSG_HANDLE_TASK); 407 } 408 409 @Override setDataBrokerListener(@onNull DataBrokerListener dataBrokerListener)410 public void setDataBrokerListener(@NonNull DataBrokerListener dataBrokerListener) { 411 if (mDisabled) { 412 return; 413 } 414 mDataBrokerListener = dataBrokerListener; 415 } 416 417 @Override setTaskExecutionPriority(int priority)418 public void setTaskExecutionPriority(int priority) { 419 if (mDisabled) { 420 return; 421 } 422 if (priority == mPriority) { 423 return; 424 } 425 mPriority = priority; 426 scheduleNextTask(); // when priority updates, schedule a task which checks task queue 427 } 428 429 @VisibleForTesting 430 @NonNull getSubscriptionMap()431 ArrayMap<String, List<DataSubscriber>> getSubscriptionMap() { 432 return new ArrayMap<>(mSubscriptionMap); 433 } 434 435 @VisibleForTesting 436 @NonNull getTelemetryHandler()437 Handler getTelemetryHandler() { 438 return mTelemetryHandler; 439 } 440 441 @VisibleForTesting 442 @NonNull getTaskQueue()443 PriorityBlockingQueue<ScriptExecutionTask> getTaskQueue() { 444 return mTaskQueue; 445 } 446 447 /** 448 * Polls and runs a task from the head of the priority queue if the queue is nonempty and the 449 * head of the queue has priority higher than or equal to the current priority. A higher 450 * priority is denoted by a lower priority number, so head of the queue should have equal or 451 * lower priority number to be polled. 452 */ pollAndExecuteTask()453 private void pollAndExecuteTask() { 454 // check databroker state is ready to run script 455 if (mDisabled || mCurrentMetricsConfigName != null) { 456 Slogf.d(CarLog.TAG_TELEMETRY, "Ignoring the task, disabled or no config."); 457 return; 458 } 459 // check task is valid and ready to be run 460 ScriptExecutionTask task = mTaskQueue.peek(); 461 if (task == null || task.getPriority() > mPriority) { 462 Slogf.d(CarLog.TAG_TELEMETRY, "Ignoring the task, either task is null or low priority"); 463 return; 464 } 465 // if script executor is null, bind service 466 if (mScriptExecutor == null) { 467 Slogf.w(CarLog.TAG_TELEMETRY, "script executor is null, binding to script executor"); 468 // upon successful binding, a task will be scheduled to run if there are any 469 mTelemetryHandler.sendEmptyMessage(MSG_BIND_TO_SCRIPT_EXECUTOR); 470 return; 471 } 472 mTaskQueue.poll(); // remove task from queue 473 mPublisherCountArray.append( 474 task.getPublisherType(), 475 mPublisherCountArray.get(task.getPublisherType()) - 1); 476 477 if (task.bypassScriptExecutor()) { 478 // delegate to DataBrokerListener to handle storing data and scheduling next task 479 mDataBrokerListener.onMetricsReport(task.getMetricsConfig().getName(), 480 task.getData(), /* state= */ null); 481 return; 482 } 483 484 // update current config name because a script is currently running 485 mCurrentMetricsConfigName = task.getMetricsConfig().getName(); 486 mScriptExecutionTraceLog.traceBegin( 487 "executing script " + mCurrentMetricsConfigName); 488 try { 489 if (task.isLargeData()) { 490 if (DEBUG) { 491 Slogf.d(CarLog.TAG_TELEMETRY, 492 "Running with large func %s of %s in ScriptExecutor.", 493 task.getHandlerName(), 494 mCurrentMetricsConfigName); 495 } 496 invokeScriptForLargeInput(task); 497 } else if (task.isBundleList()) { 498 if (DEBUG) { 499 Slogf.d(CarLog.TAG_TELEMETRY, 500 "Running with bundle list func %s of %s in ScriptExecutor.", 501 task.getHandlerName(), 502 mCurrentMetricsConfigName); 503 } 504 invokeScriptForBundleList(task); 505 } else { 506 if (DEBUG) { 507 Slogf.d(CarLog.TAG_TELEMETRY, "Running func %s of %s in ScriptExecutor.", 508 task.getHandlerName(), 509 mCurrentMetricsConfigName); 510 } 511 mScriptExecutor.invokeScript( 512 task.getMetricsConfig().getScript(), 513 task.getHandlerName(), 514 task.getData(), 515 mResultStore.getInterimResult(mCurrentMetricsConfigName), 516 mScriptExecutorListener); 517 } 518 mTelemetryHandler.sendEmptyMessageDelayed( 519 MSG_STOP_HANGING_SCRIPT, MAX_SCRIPT_EXECUTION_TIME_MILLIS); 520 } catch (RemoteException e) { 521 mScriptExecutionTraceLog.traceEnd(); 522 Slogf.w(CarLog.TAG_TELEMETRY, "remote exception occurred invoking script", e); 523 unbindScriptExecutor(); 524 addTaskToQueue(task); // will trigger scheduleNextTask() and re-binding scriptexecutor 525 } catch (IOException e) { 526 mScriptExecutionTraceLog.traceEnd(); 527 Slogf.w(CarLog.TAG_TELEMETRY, "Either unable to create pipe or failed to pipe data" 528 + " to ScriptExecutor. Skipping the published data", e); 529 mCurrentMetricsConfigName = null; 530 scheduleNextTask(); // drop this task and schedule the next one 531 } 532 } 533 534 /** 535 * Sets up pipes, invokes ScriptExecutor#invokeScriptForLargeInput() API, and writes the 536 * script input to the pipe. 537 * 538 * @param task containing all the necessary parameters for ScriptExecutor API. 539 * @throws IOException if cannot create pipe or cannot write the bundle to pipe. 540 * @throws RemoteException if ScriptExecutor failed. 541 */ invokeScriptForLargeInput(@onNull ScriptExecutionTask task)542 private void invokeScriptForLargeInput(@NonNull ScriptExecutionTask task) 543 throws IOException, RemoteException { 544 ParcelFileDescriptor[] fds = ParcelFileDescriptor.createPipe(); 545 ParcelFileDescriptor readFd = fds[0]; 546 ParcelFileDescriptor writeFd = fds[1]; 547 try { 548 mScriptExecutor.invokeScriptForLargeInput( 549 task.getMetricsConfig().getScript(), 550 task.getHandlerName(), 551 readFd, 552 mResultStore.getInterimResult(mCurrentMetricsConfigName), 553 mScriptExecutorListener); 554 } catch (RemoteException e) { 555 IoUtils.closeQuietly(readFd); 556 IoUtils.closeQuietly(writeFd); 557 throw e; 558 } 559 IoUtils.closeQuietly(readFd); 560 561 Slogf.d(CarLog.TAG_TELEMETRY, "writing large script data to pipe"); 562 try (OutputStream outputStream = new ParcelFileDescriptor.AutoCloseOutputStream(writeFd)) { 563 task.getData().writeToStream(outputStream); 564 } 565 } 566 567 /** 568 * Sends bundle list with LargeParcelable mechanism. 569 * 570 * @param task containing all the necessary parameters for ScriptExecutor API. 571 * @throws RemoteException if ScriptExecutor failed. 572 */ invokeScriptForBundleList(@onNull ScriptExecutionTask task)573 private void invokeScriptForBundleList(@NonNull ScriptExecutionTask task) 574 throws RemoteException { 575 BundleList bl = new BundleList(); 576 bl.bundles = task.getBundleList(); 577 bl = (BundleList) LargeParcelable.toLargeParcelable( 578 bl, () -> { 579 BundleList bundleList = new BundleList(); 580 bundleList.bundles = new ArrayList<>(); 581 return bundleList; 582 }); 583 mScriptExecutor.invokeScriptForBundleList( 584 task.getMetricsConfig().getScript(), 585 task.getHandlerName(), 586 bl, 587 mResultStore.getInterimResult(mCurrentMetricsConfigName), 588 mScriptExecutorListener); 589 } 590 buildTelemetryError( @onNull TelemetryError.ErrorType errorType, @NonNull String message, @Nullable String stackTrace)591 private TelemetryError buildTelemetryError( 592 @NonNull TelemetryError.ErrorType errorType, 593 @NonNull String message, 594 @Nullable String stackTrace) { 595 TelemetryError.Builder error = TelemetryError.newBuilder() 596 .setErrorType(errorType) 597 .setMessage(message); 598 if (stackTrace != null) { 599 error.setStackTrace(stackTrace); 600 } 601 return error.build(); 602 } 603 604 /** 605 * This helper method should be called as soon as script execution returns. 606 * It returns the name of the MetricsConfig whose script returned. 607 */ endScriptExecution()608 private String endScriptExecution() { 609 mScriptExecutionTraceLog.traceEnd(); // end trace as soon as script completes running 610 mTelemetryHandler.removeMessages(MSG_STOP_HANGING_SCRIPT); // script did not hang 611 // get and set the mCurrentMetricsConfigName to null 612 String configName = mCurrentMetricsConfigName; 613 mCurrentMetricsConfigName = null; 614 return configName; 615 } 616 617 /** Stores final metrics and schedules the next task. */ onScriptFinished(@onNull PersistableBundle result)618 private void onScriptFinished(@NonNull PersistableBundle result) { 619 if (DEBUG) { 620 Slogf.d(CarLog.TAG_TELEMETRY, "A script finished, storing the final result."); 621 } 622 mTelemetryHandler.post(() -> { 623 String configName = endScriptExecution(); 624 if (configName == null) { 625 return; 626 } 627 // delegate to DataBrokerListener to handle storing data and scheduling next task 628 mDataBrokerListener.onReportFinished(configName, result); 629 }); 630 } 631 632 /** Stores interim metrics and schedules the next task. */ onScriptSuccess(@onNull PersistableBundle stateToPersist)633 private void onScriptSuccess(@NonNull PersistableBundle stateToPersist) { 634 if (DEBUG) { 635 Slogf.d(CarLog.TAG_TELEMETRY, "A script succeeded, storing the interim result."); 636 } 637 mTelemetryHandler.post(() -> { 638 String configName = endScriptExecution(); 639 if (configName == null) { 640 return; 641 } 642 // delegate to DataBrokerListener to handle storing data and scheduling next task 643 mDataBrokerListener.onEventConsumed(configName, stateToPersist); 644 }); 645 } 646 647 /** Stores telemetry error and schedules the next task. */ onScriptError( int errorType, @NonNull String message, @Nullable String stackTrace)648 private void onScriptError( 649 int errorType, @NonNull String message, @Nullable String stackTrace) { 650 if (DEBUG) { 651 Slogf.d(CarLog.TAG_TELEMETRY, "A script failed: %d %s\n%s", 652 errorType, message, stackTrace); 653 } 654 mTelemetryHandler.post(() -> { 655 String configName = endScriptExecution(); 656 if (configName == null) { 657 return; 658 } 659 // delegate to DataBrokerListener to handle storing data and scheduling next task 660 mDataBrokerListener.onReportFinished( 661 configName, 662 buildTelemetryError( 663 TelemetryError.ErrorType.forNumber(errorType), 664 message, 665 stackTrace)); 666 }); 667 } 668 onMetricsReport( @onNull PersistableBundle report, @Nullable PersistableBundle stateToPersist)669 private void onMetricsReport( 670 @NonNull PersistableBundle report, @Nullable PersistableBundle stateToPersist) { 671 if (DEBUG) { 672 Slogf.d(CarLog.TAG_TELEMETRY, "A script produced a report without finishing."); 673 } 674 mTelemetryHandler.post(() -> { 675 String configName = endScriptExecution(); 676 if (configName == null) { 677 return; 678 } 679 mDataBrokerListener.onMetricsReport(configName, report, stateToPersist); 680 }); 681 } 682 683 /** Listens for script execution status. Methods are called on the binder thread. */ 684 private static final class ScriptExecutorListener extends IScriptExecutorListener.Stub { 685 private final WeakReference<DataBrokerImpl> mWeakDataBroker; 686 ScriptExecutorListener(@onNull DataBrokerImpl dataBroker)687 private ScriptExecutorListener(@NonNull DataBrokerImpl dataBroker) { 688 mWeakDataBroker = new WeakReference<>(dataBroker); 689 } 690 691 @Override onScriptFinished(@onNull PersistableBundle result)692 public void onScriptFinished(@NonNull PersistableBundle result) { 693 DataBrokerImpl dataBroker = mWeakDataBroker.get(); 694 if (dataBroker == null) { 695 return; 696 } 697 dataBroker.onScriptFinished(result); 698 } 699 700 @Override onSuccess(@onNull PersistableBundle stateToPersist)701 public void onSuccess(@NonNull PersistableBundle stateToPersist) { 702 DataBrokerImpl dataBroker = mWeakDataBroker.get(); 703 if (dataBroker == null) { 704 return; 705 } 706 dataBroker.onScriptSuccess(stateToPersist); 707 } 708 709 @Override onError(int errorType, @NonNull String message, @Nullable String stackTrace)710 public void onError(int errorType, @NonNull String message, @Nullable String stackTrace) { 711 DataBrokerImpl dataBroker = mWeakDataBroker.get(); 712 if (dataBroker == null) { 713 return; 714 } 715 dataBroker.onScriptError(errorType, message, stackTrace); 716 } 717 718 @Override onMetricsReport( @onNull PersistableBundle report, @Nullable PersistableBundle stateToPersist)719 public void onMetricsReport( 720 @NonNull PersistableBundle report, @Nullable PersistableBundle stateToPersist) { 721 DataBrokerImpl dataBroker = mWeakDataBroker.get(); 722 if (dataBroker == null) { 723 return; 724 } 725 dataBroker.onMetricsReport(report, stateToPersist); 726 } 727 } 728 729 /** Callback handler to handle scheduling and rescheduling of {@link ScriptExecutionTask}s. */ 730 class TaskHandler extends Handler { TaskHandler(@onNull Looper looper)731 TaskHandler(@NonNull Looper looper) { 732 super(looper); 733 } 734 735 /** 736 * Handles a message depending on the message ID. 737 * If the msg ID is MSG_HANDLE_TASK, it polls a task from the priority queue and executing a 738 * {@link ScriptExecutionTask}. There are multiple places where this message is sent: when 739 * priority updates, when a new task is added to the priority queue, and when a task 740 * finishes running. 741 */ 742 @Override handleMessage(@onNull Message msg)743 public void handleMessage(@NonNull Message msg) { 744 switch (msg.what) { 745 case MSG_HANDLE_TASK: 746 pollAndExecuteTask(); // run the next task 747 break; 748 case MSG_BIND_TO_SCRIPT_EXECUTOR: 749 bindScriptExecutor(); 750 break; 751 case MSG_STOP_HANGING_SCRIPT: 752 // TODO(b/223224704): log error 753 unbindScriptExecutor(); 754 scheduleNextTask(); 755 break; 756 default: 757 Slogf.w(CarLog.TAG_TELEMETRY, "TaskHandler received unknown message."); 758 } 759 } 760 } 761 } 762