1 /*
2  * Copyright (C) 2021 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.car.telemetry.databroker;
18 
19 import static com.android.car.telemetry.CarTelemetryService.DEBUG;
20 
21 import android.annotation.NonNull;
22 import android.annotation.Nullable;
23 import android.car.builtin.util.Slogf;
24 import android.car.builtin.util.TimingsTraceLog;
25 import android.car.telemetry.TelemetryProto;
26 import android.car.telemetry.TelemetryProto.MetricsConfig;
27 import android.car.telemetry.TelemetryProto.TelemetryError;
28 import android.content.ComponentName;
29 import android.content.Context;
30 import android.content.Intent;
31 import android.content.ServiceConnection;
32 import android.content.pm.PackageInfo;
33 import android.content.pm.PackageManager;
34 import android.os.Handler;
35 import android.os.HandlerThread;
36 import android.os.IBinder;
37 import android.os.Looper;
38 import android.os.Message;
39 import android.os.ParcelFileDescriptor;
40 import android.os.PersistableBundle;
41 import android.os.RemoteException;
42 import android.os.UserHandle;
43 import android.util.ArrayMap;
44 import android.util.Log;
45 import android.util.SparseIntArray;
46 
47 import com.android.car.CarLog;
48 import com.android.car.CarServiceUtils;
49 import com.android.car.internal.LargeParcelable;
50 import com.android.car.telemetry.CarTelemetryService;
51 import com.android.car.telemetry.ResultStore;
52 import com.android.car.telemetry.publisher.AbstractPublisher;
53 import com.android.car.telemetry.publisher.PublisherFactory;
54 import com.android.car.telemetry.scriptexecutorinterface.BundleList;
55 import com.android.car.telemetry.scriptexecutorinterface.IScriptExecutor;
56 import com.android.car.telemetry.scriptexecutorinterface.IScriptExecutorListener;
57 import com.android.car.telemetry.util.IoUtils;
58 import com.android.internal.annotations.VisibleForTesting;
59 
60 import java.io.IOException;
61 import java.io.OutputStream;
62 import java.lang.ref.WeakReference;
63 import java.util.ArrayList;
64 import java.util.Iterator;
65 import java.util.List;
66 import java.util.concurrent.PriorityBlockingQueue;
67 
68 /**
69  * Implementation of the data path component of CarTelemetryService. Forwards the published data
70  * from publishers to consumers subject to the Controller's decision.
71  * All methods should be called from the telemetry thread unless otherwise specified as thread-safe.
72  */
73 public class DataBrokerImpl implements DataBroker {
74 
75     @VisibleForTesting
76     static final int MSG_HANDLE_TASK = 1;
77     @VisibleForTesting
78     static final int MSG_BIND_TO_SCRIPT_EXECUTOR = 2;
79     @VisibleForTesting
80     static final int MSG_STOP_HANGING_SCRIPT = 3;
81 
82     /** Bind to script executor 5 times before entering disabled state. */
83     private static final int MAX_BIND_SCRIPT_EXECUTOR_ATTEMPTS = 5;
84 
85     /** Maximum wait time for a script to finish. */
86     private static final long MAX_SCRIPT_EXECUTION_TIME_MILLIS = 30_000L; // 30 seconds
87 
88     private static final String[] SCRIPT_EXECUTOR_PACKAGE_CANDIDATES =
89             {"com.android.car.scriptexecutor", "com.google.android.car.scriptexecutor"};
90     private static final String SCRIPT_EXECUTOR_CLASS =
91             "com.android.car.scriptexecutor.ScriptExecutor";
92 
93     private final Context mContext;
94     private final PublisherFactory mPublisherFactory;
95     private final ResultStore mResultStore;
96     private final ScriptExecutorListener mScriptExecutorListener;
97     private final HandlerThread mTelemetryThread = CarServiceUtils.getHandlerThread(
98             CarTelemetryService.class.getSimpleName());
99     private final Handler mTelemetryHandler = new TaskHandler(mTelemetryThread.getLooper());
100 
101     /** Thread-safe priority queue for scheduling tasks. */
102     private final PriorityBlockingQueue<ScriptExecutionTask> mTaskQueue =
103             new PriorityBlockingQueue<>();
104 
105     /**
106      * Index is the type of {@link TelemetryProto.Publisher}, value is the number of tasks pending
107      * script execution that are produced by that publisher.
108      */
109     private final SparseIntArray mPublisherCountArray = new SparseIntArray();
110 
111     /**
112      * Maps MetricsConfig name to its subscriptions. This map is useful for removing MetricsConfigs.
113      */
114     private final ArrayMap<String, List<DataSubscriber>> mSubscriptionMap = new ArrayMap<>();
115 
116     /**
117      * If something irrecoverable happened, DataBroker should enter into a disabled state to prevent
118      * doing futile work.
119      */
120     private boolean mDisabled = false;
121 
122     /** Current number of attempts to bind to ScriptExecutor. */
123     private int mBindScriptExecutorAttempts = 0;
124 
125     /** Priority of current system to determine if a {@link ScriptExecutionTask} can run. */
126     private int mPriority = 1;
127 
128     /** Waiting period between attempts to bind script executor. Can be shortened for tests. */
129     @VisibleForTesting long mBindScriptExecutorDelayMillis = 3_000L;
130 
131     /**
132      * Name of the {@link MetricsConfig} that is currently running.
133      * A non-null value indicates ScriptExecutor is currently running this config, which means
134      * DataBroker should not make another ScriptExecutor binder call.
135      */
136     private String mCurrentMetricsConfigName;
137     private IScriptExecutor mScriptExecutor;
138     private DataBrokerListener mDataBrokerListener;
139 
140     /**
141      * Used only for the purpose of tracking the duration of running a script. The duration
142      * starts before the ScriptExecutor binder call and ends when a status is returned via
143      * ScriptExecutorListener or when the binder call throws an exception.
144      */
145     private TimingsTraceLog mScriptExecutionTraceLog;
146 
147     private final ServiceConnection mServiceConnection = new ServiceConnection() {
148         @Override
149         public void onServiceConnected(ComponentName name, IBinder service) {
150             mTelemetryHandler.post(() -> {
151                 mScriptExecutor = IScriptExecutor.Stub.asInterface(service);
152                 scheduleNextTask();
153             });
154         }
155 
156         @Override
157         public void onServiceDisconnected(ComponentName name) {
158             // TODO(b/198684473): clean up the state after script executor disconnects
159             mTelemetryHandler.post(() -> {
160                 unbindScriptExecutor();
161             });
162         }
163     };
164 
165     private final AbstractPublisher.PublisherListener mPublisherListener =
166             new AbstractPublisher.PublisherListener() {
167         @Override
168         public void onPublisherFailure(
169                 @NonNull List<TelemetryProto.MetricsConfig> affectedConfigs,
170                 @Nullable Throwable error) {
171             Slogf.w(CarLog.TAG_TELEMETRY, "Publisher failed", error);
172             // when a publisher fails, construct an TelemetryError result and send to client
173             String stackTrace = null;
174             if (error != null) {
175                 stackTrace = Log.getStackTraceString(error);
176             }
177             TelemetryError telemetryError = buildTelemetryError(
178                     TelemetryError.ErrorType.PUBLISHER_FAILED, "Publisher failed", stackTrace);
179             for (TelemetryProto.MetricsConfig config : affectedConfigs) {
180                 // this will remove the MetricsConfig and notify the client of result
181                 mDataBrokerListener.onReportFinished(config.getName(), telemetryError);
182             }
183         }
184 
185         @Override
186         public void onConfigFinished(@NonNull TelemetryProto.MetricsConfig metricsConfig) {
187             String configName = metricsConfig.getName();
188             Slogf.i(CarLog.TAG_TELEMETRY,
189                     "Publisher sets MetricsConfig(" + configName + ") as finished");
190             mDataBrokerListener.onReportFinished(configName);
191         }
192     };
193 
DataBrokerImpl( @onNull Context context, @NonNull PublisherFactory publisherFactory, @NonNull ResultStore resultStore, @NonNull TimingsTraceLog traceLog)194     public DataBrokerImpl(
195             @NonNull Context context,
196             @NonNull PublisherFactory publisherFactory,
197             @NonNull ResultStore resultStore,
198             @NonNull TimingsTraceLog traceLog) {
199         mContext = context;
200         mPublisherFactory = publisherFactory;
201         mResultStore = resultStore;
202         mScriptExecutorListener = new ScriptExecutorListener(this);
203         mPublisherFactory.initialize(mPublisherListener);
204         mScriptExecutionTraceLog = traceLog;
205     }
206 
207     @Nullable
findExecutorPackage()208     private String findExecutorPackage() {
209         PackageInfo info = null;
210         for (int i = 0; i < SCRIPT_EXECUTOR_PACKAGE_CANDIDATES.length; i++) {
211             try {
212                 info = mContext.getPackageManager().getPackageInfo(
213                         SCRIPT_EXECUTOR_PACKAGE_CANDIDATES[i], /* flags= */ 0);
214                 if (info != null) {
215                     break;
216                 }
217             } catch (PackageManager.NameNotFoundException e) {
218                 // ignore
219             }
220         }
221         if (info == null) {
222             return null;
223         }
224         return info.packageName;
225     }
226 
bindScriptExecutor()227     private void bindScriptExecutor() {
228         // do not re-bind if broker is in a disabled state or if script executor is nonnull
229         if (mDisabled || mScriptExecutor != null) {
230             return;
231         }
232         String executorPackage = findExecutorPackage();
233         if (executorPackage == null) {
234             Slogf.w(CarLog.TAG_TELEMETRY, "Cannot find executor package");
235             return;
236         }
237         Intent intent = new Intent();
238         intent.setComponent(new ComponentName(executorPackage, SCRIPT_EXECUTOR_CLASS));
239         boolean success = mContext.bindServiceAsUser(
240                 intent,
241                 mServiceConnection,
242                 Context.BIND_AUTO_CREATE,
243                 UserHandle.SYSTEM);
244         if (success) {
245             mBindScriptExecutorAttempts = 0; // reset
246             return;
247         }
248         unbindScriptExecutor();
249         mBindScriptExecutorAttempts++;
250         if (mBindScriptExecutorAttempts < MAX_BIND_SCRIPT_EXECUTOR_ATTEMPTS) {
251             Slogf.w(CarLog.TAG_TELEMETRY,
252                     "failed to get valid connection to ScriptExecutor, retrying in "
253                             + mBindScriptExecutorDelayMillis + "ms.");
254             mTelemetryHandler.sendEmptyMessageDelayed(MSG_BIND_TO_SCRIPT_EXECUTOR,
255                     mBindScriptExecutorDelayMillis);
256         } else {
257             Slogf.w(CarLog.TAG_TELEMETRY, "failed to get valid connection to ScriptExecutor, "
258                     + "disabling DataBroker");
259             disableBroker();
260         }
261     }
262 
263     /**
264      * Unbinds {@link ScriptExecutor} to release the connection. This method should be called from
265      * the telemetry thread.
266      */
unbindScriptExecutor()267     private void unbindScriptExecutor() {
268         // TODO(b/198648763): unbind from script executor when there is no work to do
269         // if a script is running while we unbind from ScriptExecutor, end trace log first
270         if (mCurrentMetricsConfigName != null) {
271             mScriptExecutionTraceLog.traceEnd();
272             mCurrentMetricsConfigName = null;
273         }
274         mScriptExecutor = null;
275         try {
276             mContext.unbindService(mServiceConnection);
277         } catch (IllegalArgumentException e) {
278             // If ScriptExecutor is gone before unbinding, it will throw this exception
279             Slogf.w(CarLog.TAG_TELEMETRY, "Failed to unbind from ScriptExecutor", e);
280         }
281     }
282 
283     /**
284      * Enters into a disabled state because something irrecoverable happened.
285      * TODO(b/200841260): expose the state to the caller.
286      */
disableBroker()287     private void disableBroker() {
288         mDisabled = true;
289         // remove all MetricConfigs, disable all publishers, stop receiving data
290         List<String> keysToRemove = new ArrayList<>();
291         for (String configName : mSubscriptionMap.keySet()) {
292             // get the metrics config from the DataSubscriber and store in a intermediate
293             // collection. It is not safe to modify a collection while iterating on it.
294             if (mSubscriptionMap.get(configName).size() != 0) {
295                 keysToRemove.add(configName);
296             }
297         }
298         // remove metrics configurations
299         for (String configName : keysToRemove) {
300             removeMetricsConfig(configName);
301         }
302         mSubscriptionMap.clear();
303     }
304 
305     @Override
addMetricsConfig( @onNull String metricsConfigName, @NonNull MetricsConfig metricsConfig)306     public void addMetricsConfig(
307             @NonNull String metricsConfigName, @NonNull MetricsConfig metricsConfig) {
308         // TODO(b/187743369): pass status back to caller
309         // if broker is disabled or metricsConfig already exists, do nothing
310         if (mDisabled || mSubscriptionMap.containsKey(metricsConfigName)) {
311             return;
312         }
313         // Create the subscribers for this metrics configuration
314         List<DataSubscriber> dataSubscribers = new ArrayList<>(
315                 metricsConfig.getSubscribersList().size());
316         for (TelemetryProto.Subscriber subscriber : metricsConfig.getSubscribersList()) {
317             if (subscriber.getPriority() < 0) {
318                 throw new IllegalArgumentException("Subscribers must have non-negative priority");
319             }
320             // protobuf publisher to a concrete Publisher
321             AbstractPublisher publisher = mPublisherFactory.getPublisher(
322                     subscriber.getPublisher().getPublisherCase());
323             // create DataSubscriber from TelemetryProto.Subscriber
324             DataSubscriber dataSubscriber = new DataSubscriber(
325                     this,
326                     metricsConfig,
327                     subscriber);
328             dataSubscribers.add(dataSubscriber);
329             // addDataSubscriber could throw an exception, let CarTelemetryService handle it
330             publisher.addDataSubscriber(dataSubscriber);
331         }
332         mSubscriptionMap.put(metricsConfigName, dataSubscribers);
333     }
334 
335     @Override
removeMetricsConfig(@onNull String metricsConfigName)336     public void removeMetricsConfig(@NonNull String metricsConfigName) {
337         // TODO(b/187743369): pass status back to caller
338         if (!mSubscriptionMap.containsKey(metricsConfigName)) {
339             return;
340         }
341         // get the subscriptions associated with this MetricsConfig, remove it from the map
342         List<DataSubscriber> dataSubscribers = mSubscriptionMap.remove(metricsConfigName);
343         // for each subscriber, remove it from publishers
344         for (DataSubscriber subscriber : dataSubscribers) {
345             AbstractPublisher publisher = mPublisherFactory.getPublisher(
346                     subscriber.getPublisherParam().getPublisherCase());
347             try {
348                 publisher.removeDataSubscriber(subscriber);
349             } catch (IllegalArgumentException e) {
350                 // It shouldn't happen, but if happens, let's just log it.
351                 Slogf.w(CarLog.TAG_TELEMETRY, "Failed to remove subscriber from publisher", e);
352             }
353         }
354         // Remove all the tasks associated with this metrics config. The underlying impl uses the
355         // weakly consistent iterator, which is thread-safe but does not freeze the collection while
356         // iterating, so it may or may not reflect any updates since the iterator was created.
357         // But since adding & polling from queue should happen in the same thread, the task queue
358         // should not be changed while tasks are being iterated and removed.
359         Iterator<ScriptExecutionTask> it = mTaskQueue.iterator();
360         while (it.hasNext()) {
361             ScriptExecutionTask task = it.next();
362             if (task.isAssociatedWithMetricsConfig(metricsConfigName)) {
363                 mTaskQueue.remove(task);
364                 mPublisherCountArray.append(
365                         task.getPublisherType(),
366                         mPublisherCountArray.get(task.getPublisherType()) - 1);
367             }
368         }
369     }
370 
371     @Override
removeAllMetricsConfigs()372     public void removeAllMetricsConfigs() {
373         mPublisherFactory.removeAllDataSubscribers();
374         mSubscriptionMap.clear();
375         mTaskQueue.clear();
376         mPublisherCountArray.clear();
377     }
378 
379     @Override
addTaskToQueue(@onNull ScriptExecutionTask task)380     public int addTaskToQueue(@NonNull ScriptExecutionTask task) {
381         if (mDisabled) {
382             return mPublisherCountArray.get(task.getPublisherType());
383         }
384         mTaskQueue.add(task);
385         mPublisherCountArray.append(
386                 task.getPublisherType(),
387                 mPublisherCountArray.get(task.getPublisherType()) + 1);
388         scheduleNextTask();
389         return mPublisherCountArray.get(task.getPublisherType());
390     }
391 
392     /**
393      * This method can be called from any thread.
394      * It is possible for this method to be invoked from different threads at the same time, but
395      * it is not possible to schedule the same task twice, because the handler handles message
396      * in the order they come in, this means the task will be polled sequentially instead of
397      * concurrently. Every task that is scheduled and run will be distinct.
398      * TODO(b/187743369): If the threading behavior in DataSubscriber changes, ScriptExecutionTask
399      *  will also have different threading behavior. Update javadoc when the behavior is decided.
400      */
401     @Override
scheduleNextTask()402     public void scheduleNextTask() {
403         if (mDisabled || mTelemetryHandler.hasMessages(MSG_HANDLE_TASK)) {
404             return;
405         }
406         mTelemetryHandler.sendEmptyMessage(MSG_HANDLE_TASK);
407     }
408 
409     @Override
setDataBrokerListener(@onNull DataBrokerListener dataBrokerListener)410     public void setDataBrokerListener(@NonNull DataBrokerListener dataBrokerListener) {
411         if (mDisabled) {
412             return;
413         }
414         mDataBrokerListener = dataBrokerListener;
415     }
416 
417     @Override
setTaskExecutionPriority(int priority)418     public void setTaskExecutionPriority(int priority) {
419         if (mDisabled) {
420             return;
421         }
422         if (priority == mPriority) {
423             return;
424         }
425         mPriority = priority;
426         scheduleNextTask(); // when priority updates, schedule a task which checks task queue
427     }
428 
429     @VisibleForTesting
430     @NonNull
getSubscriptionMap()431     ArrayMap<String, List<DataSubscriber>> getSubscriptionMap() {
432         return new ArrayMap<>(mSubscriptionMap);
433     }
434 
435     @VisibleForTesting
436     @NonNull
getTelemetryHandler()437     Handler getTelemetryHandler() {
438         return mTelemetryHandler;
439     }
440 
441     @VisibleForTesting
442     @NonNull
getTaskQueue()443     PriorityBlockingQueue<ScriptExecutionTask> getTaskQueue() {
444         return mTaskQueue;
445     }
446 
447     /**
448      * Polls and runs a task from the head of the priority queue if the queue is nonempty and the
449      * head of the queue has priority higher than or equal to the current priority. A higher
450      * priority is denoted by a lower priority number, so head of the queue should have equal or
451      * lower priority number to be polled.
452      */
pollAndExecuteTask()453     private void pollAndExecuteTask() {
454         // check databroker state is ready to run script
455         if (mDisabled || mCurrentMetricsConfigName != null) {
456             Slogf.d(CarLog.TAG_TELEMETRY, "Ignoring the task, disabled or no config.");
457             return;
458         }
459         // check task is valid and ready to be run
460         ScriptExecutionTask task = mTaskQueue.peek();
461         if (task == null || task.getPriority() > mPriority) {
462             Slogf.d(CarLog.TAG_TELEMETRY, "Ignoring the task, either task is null or low priority");
463             return;
464         }
465         // if script executor is null, bind service
466         if (mScriptExecutor == null) {
467             Slogf.w(CarLog.TAG_TELEMETRY, "script executor is null, binding to script executor");
468             // upon successful binding, a task will be scheduled to run if there are any
469             mTelemetryHandler.sendEmptyMessage(MSG_BIND_TO_SCRIPT_EXECUTOR);
470             return;
471         }
472         mTaskQueue.poll(); // remove task from queue
473         mPublisherCountArray.append(
474                 task.getPublisherType(),
475                 mPublisherCountArray.get(task.getPublisherType()) - 1);
476 
477         if (task.bypassScriptExecutor()) {
478             // delegate to DataBrokerListener to handle storing data and scheduling next task
479             mDataBrokerListener.onMetricsReport(task.getMetricsConfig().getName(),
480                     task.getData(), /* state= */ null);
481             return;
482         }
483 
484         // update current config name because a script is currently running
485         mCurrentMetricsConfigName = task.getMetricsConfig().getName();
486         mScriptExecutionTraceLog.traceBegin(
487                 "executing script " + mCurrentMetricsConfigName);
488         try {
489             if (task.isLargeData()) {
490                 if (DEBUG) {
491                     Slogf.d(CarLog.TAG_TELEMETRY,
492                             "Running with large func %s of %s in ScriptExecutor.",
493                             task.getHandlerName(),
494                             mCurrentMetricsConfigName);
495                 }
496                 invokeScriptForLargeInput(task);
497             } else if (task.isBundleList()) {
498                 if (DEBUG) {
499                     Slogf.d(CarLog.TAG_TELEMETRY,
500                             "Running with bundle list func %s of %s in ScriptExecutor.",
501                             task.getHandlerName(),
502                             mCurrentMetricsConfigName);
503                 }
504                 invokeScriptForBundleList(task);
505             } else {
506                 if (DEBUG) {
507                     Slogf.d(CarLog.TAG_TELEMETRY, "Running func %s of %s in ScriptExecutor.",
508                             task.getHandlerName(),
509                             mCurrentMetricsConfigName);
510                 }
511                 mScriptExecutor.invokeScript(
512                         task.getMetricsConfig().getScript(),
513                         task.getHandlerName(),
514                         task.getData(),
515                         mResultStore.getInterimResult(mCurrentMetricsConfigName),
516                         mScriptExecutorListener);
517             }
518             mTelemetryHandler.sendEmptyMessageDelayed(
519                     MSG_STOP_HANGING_SCRIPT, MAX_SCRIPT_EXECUTION_TIME_MILLIS);
520         } catch (RemoteException e) {
521             mScriptExecutionTraceLog.traceEnd();
522             Slogf.w(CarLog.TAG_TELEMETRY, "remote exception occurred invoking script", e);
523             unbindScriptExecutor();
524             addTaskToQueue(task); // will trigger scheduleNextTask() and re-binding scriptexecutor
525         } catch (IOException e) {
526             mScriptExecutionTraceLog.traceEnd();
527             Slogf.w(CarLog.TAG_TELEMETRY, "Either unable to create pipe or failed to pipe data"
528                     + " to ScriptExecutor. Skipping the published data", e);
529             mCurrentMetricsConfigName = null;
530             scheduleNextTask(); // drop this task and schedule the next one
531         }
532     }
533 
534     /**
535      * Sets up pipes, invokes ScriptExecutor#invokeScriptForLargeInput() API, and writes the
536      * script input to the pipe.
537      *
538      * @param task containing all the necessary parameters for ScriptExecutor API.
539      * @throws IOException if cannot create pipe or cannot write the bundle to pipe.
540      * @throws RemoteException if ScriptExecutor failed.
541      */
invokeScriptForLargeInput(@onNull ScriptExecutionTask task)542     private void invokeScriptForLargeInput(@NonNull ScriptExecutionTask task)
543             throws IOException, RemoteException {
544         ParcelFileDescriptor[] fds = ParcelFileDescriptor.createPipe();
545         ParcelFileDescriptor readFd = fds[0];
546         ParcelFileDescriptor writeFd = fds[1];
547         try {
548             mScriptExecutor.invokeScriptForLargeInput(
549                     task.getMetricsConfig().getScript(),
550                     task.getHandlerName(),
551                     readFd,
552                     mResultStore.getInterimResult(mCurrentMetricsConfigName),
553                     mScriptExecutorListener);
554         } catch (RemoteException e) {
555             IoUtils.closeQuietly(readFd);
556             IoUtils.closeQuietly(writeFd);
557             throw e;
558         }
559         IoUtils.closeQuietly(readFd);
560 
561         Slogf.d(CarLog.TAG_TELEMETRY, "writing large script data to pipe");
562         try (OutputStream outputStream = new ParcelFileDescriptor.AutoCloseOutputStream(writeFd)) {
563             task.getData().writeToStream(outputStream);
564         }
565     }
566 
567     /**
568      * Sends bundle list with LargeParcelable mechanism.
569      *
570      * @param task containing all the necessary parameters for ScriptExecutor API.
571      * @throws RemoteException if ScriptExecutor failed.
572      */
invokeScriptForBundleList(@onNull ScriptExecutionTask task)573     private void invokeScriptForBundleList(@NonNull ScriptExecutionTask task)
574             throws RemoteException {
575         BundleList bl = new BundleList();
576         bl.bundles = task.getBundleList();
577         bl = (BundleList) LargeParcelable.toLargeParcelable(
578                 bl, () -> {
579                     BundleList bundleList = new BundleList();
580                     bundleList.bundles = new ArrayList<>();
581                     return bundleList;
582                 });
583         mScriptExecutor.invokeScriptForBundleList(
584                 task.getMetricsConfig().getScript(),
585                 task.getHandlerName(),
586                 bl,
587                 mResultStore.getInterimResult(mCurrentMetricsConfigName),
588                 mScriptExecutorListener);
589     }
590 
buildTelemetryError( @onNull TelemetryError.ErrorType errorType, @NonNull String message, @Nullable String stackTrace)591     private TelemetryError buildTelemetryError(
592             @NonNull TelemetryError.ErrorType errorType,
593             @NonNull String message,
594             @Nullable String stackTrace) {
595         TelemetryError.Builder error = TelemetryError.newBuilder()
596                 .setErrorType(errorType)
597                 .setMessage(message);
598         if (stackTrace != null) {
599             error.setStackTrace(stackTrace);
600         }
601         return error.build();
602     }
603 
604     /**
605      * This helper method should be called as soon as script execution returns.
606      * It returns the name of the MetricsConfig whose script returned.
607      */
endScriptExecution()608     private String endScriptExecution() {
609         mScriptExecutionTraceLog.traceEnd(); // end trace as soon as script completes running
610         mTelemetryHandler.removeMessages(MSG_STOP_HANGING_SCRIPT); // script did not hang
611         // get and set the mCurrentMetricsConfigName to null
612         String configName = mCurrentMetricsConfigName;
613         mCurrentMetricsConfigName = null;
614         return configName;
615     }
616 
617     /** Stores final metrics and schedules the next task. */
onScriptFinished(@onNull PersistableBundle result)618     private void onScriptFinished(@NonNull PersistableBundle result) {
619         if (DEBUG) {
620             Slogf.d(CarLog.TAG_TELEMETRY, "A script finished, storing the final result.");
621         }
622         mTelemetryHandler.post(() -> {
623             String configName = endScriptExecution();
624             if (configName == null) {
625                 return;
626             }
627             // delegate to DataBrokerListener to handle storing data and scheduling next task
628             mDataBrokerListener.onReportFinished(configName, result);
629         });
630     }
631 
632     /** Stores interim metrics and schedules the next task. */
onScriptSuccess(@onNull PersistableBundle stateToPersist)633     private void onScriptSuccess(@NonNull PersistableBundle stateToPersist) {
634         if (DEBUG) {
635             Slogf.d(CarLog.TAG_TELEMETRY, "A script succeeded, storing the interim result.");
636         }
637         mTelemetryHandler.post(() -> {
638             String configName = endScriptExecution();
639             if (configName == null) {
640                 return;
641             }
642             // delegate to DataBrokerListener to handle storing data and scheduling next task
643             mDataBrokerListener.onEventConsumed(configName, stateToPersist);
644         });
645     }
646 
647     /** Stores telemetry error and schedules the next task. */
onScriptError( int errorType, @NonNull String message, @Nullable String stackTrace)648     private void onScriptError(
649             int errorType, @NonNull String message, @Nullable String stackTrace) {
650         if (DEBUG) {
651             Slogf.d(CarLog.TAG_TELEMETRY, "A script failed: %d %s\n%s",
652                     errorType, message, stackTrace);
653         }
654         mTelemetryHandler.post(() -> {
655             String configName = endScriptExecution();
656             if (configName == null) {
657                 return;
658             }
659             // delegate to DataBrokerListener to handle storing data and scheduling next task
660             mDataBrokerListener.onReportFinished(
661                     configName,
662                     buildTelemetryError(
663                             TelemetryError.ErrorType.forNumber(errorType),
664                             message,
665                             stackTrace));
666         });
667     }
668 
onMetricsReport( @onNull PersistableBundle report, @Nullable PersistableBundle stateToPersist)669     private void onMetricsReport(
670             @NonNull PersistableBundle report, @Nullable PersistableBundle stateToPersist) {
671         if (DEBUG) {
672             Slogf.d(CarLog.TAG_TELEMETRY, "A script produced a report without finishing.");
673         }
674         mTelemetryHandler.post(() -> {
675             String configName = endScriptExecution();
676             if (configName == null) {
677                 return;
678             }
679             mDataBrokerListener.onMetricsReport(configName, report, stateToPersist);
680         });
681     }
682 
683     /** Listens for script execution status. Methods are called on the binder thread. */
684     private static final class ScriptExecutorListener extends IScriptExecutorListener.Stub {
685         private final WeakReference<DataBrokerImpl> mWeakDataBroker;
686 
ScriptExecutorListener(@onNull DataBrokerImpl dataBroker)687         private ScriptExecutorListener(@NonNull DataBrokerImpl dataBroker) {
688             mWeakDataBroker = new WeakReference<>(dataBroker);
689         }
690 
691         @Override
onScriptFinished(@onNull PersistableBundle result)692         public void onScriptFinished(@NonNull PersistableBundle result) {
693             DataBrokerImpl dataBroker = mWeakDataBroker.get();
694             if (dataBroker == null) {
695                 return;
696             }
697             dataBroker.onScriptFinished(result);
698         }
699 
700         @Override
onSuccess(@onNull PersistableBundle stateToPersist)701         public void onSuccess(@NonNull PersistableBundle stateToPersist) {
702             DataBrokerImpl dataBroker = mWeakDataBroker.get();
703             if (dataBroker == null) {
704                 return;
705             }
706             dataBroker.onScriptSuccess(stateToPersist);
707         }
708 
709         @Override
onError(int errorType, @NonNull String message, @Nullable String stackTrace)710         public void onError(int errorType, @NonNull String message, @Nullable String stackTrace) {
711             DataBrokerImpl dataBroker = mWeakDataBroker.get();
712             if (dataBroker == null) {
713                 return;
714             }
715             dataBroker.onScriptError(errorType, message, stackTrace);
716         }
717 
718         @Override
onMetricsReport( @onNull PersistableBundle report, @Nullable PersistableBundle stateToPersist)719         public void onMetricsReport(
720                 @NonNull PersistableBundle report, @Nullable PersistableBundle stateToPersist) {
721             DataBrokerImpl dataBroker = mWeakDataBroker.get();
722             if (dataBroker == null) {
723                 return;
724             }
725             dataBroker.onMetricsReport(report, stateToPersist);
726         }
727     }
728 
729     /** Callback handler to handle scheduling and rescheduling of {@link ScriptExecutionTask}s. */
730     class TaskHandler extends Handler {
TaskHandler(@onNull Looper looper)731         TaskHandler(@NonNull Looper looper) {
732             super(looper);
733         }
734 
735         /**
736          * Handles a message depending on the message ID.
737          * If the msg ID is MSG_HANDLE_TASK, it polls a task from the priority queue and executing a
738          * {@link ScriptExecutionTask}. There are multiple places where this message is sent: when
739          * priority updates, when a new task is added to the priority queue, and when a task
740          * finishes running.
741          */
742         @Override
handleMessage(@onNull Message msg)743         public void handleMessage(@NonNull Message msg) {
744             switch (msg.what) {
745                 case MSG_HANDLE_TASK:
746                     pollAndExecuteTask(); // run the next task
747                     break;
748                 case MSG_BIND_TO_SCRIPT_EXECUTOR:
749                     bindScriptExecutor();
750                     break;
751                 case MSG_STOP_HANGING_SCRIPT:
752                     // TODO(b/223224704): log error
753                     unbindScriptExecutor();
754                     scheduleNextTask();
755                     break;
756                 default:
757                     Slogf.w(CarLog.TAG_TELEMETRY, "TaskHandler received unknown message.");
758             }
759         }
760     }
761 }
762