1 // Copyright (C) 2020 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //      http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "DefaultEngine.h"
16 
17 #include <android-base/logging.h>
18 
19 #include <algorithm>
20 #include <cassert>
21 #include <memory>
22 #include <mutex>
23 #include <thread>
24 #include <vector>
25 
26 #include "ClientInterface.h"
27 #include "EventGenerator.h"
28 #include "EvsDisplayManager.h"
29 #include "InputFrame.h"
30 #include "PrebuiltGraph.h"
31 
32 namespace android {
33 namespace automotive {
34 namespace computepipe {
35 namespace runner {
36 namespace engine {
37 
38 using android::automotive::computepipe::graph::PrebuiltGraph;
39 using android::automotive::computepipe::runner::client_interface::ClientInterface;
40 using android::automotive::computepipe::runner::generator::DefaultEvent;
41 using android::automotive::computepipe::runner::input_manager::InputEngineInterface;
42 using android::automotive::computepipe::runner::stream_manager::StreamEngineInterface;
43 using android::automotive::computepipe::runner::stream_manager::StreamManager;
44 
45 namespace {
46 
getStreamIdFromSource(std::string source)47 int getStreamIdFromSource(std::string source) {
48     auto pos = source.find(":");
49     return std::stoi(source.substr(pos + 1));
50 }
51 }  // namespace
52 
setClientInterface(std::unique_ptr<ClientInterface> && client)53 void DefaultEngine::setClientInterface(std::unique_ptr<ClientInterface>&& client) {
54     mClient = std::move(client);
55 }
56 
setPrebuiltGraph(std::unique_ptr<PrebuiltGraph> && graph)57 void DefaultEngine::setPrebuiltGraph(std::unique_ptr<PrebuiltGraph>&& graph) {
58     mGraph = std::move(graph);
59     mGraphDescriptor = mGraph->GetSupportedGraphConfigs();
60     if (mGraph->GetGraphType() == graph::PrebuiltGraphType::REMOTE ||
61         mGraphDescriptor.input_configs_size() == 0) {
62         mIgnoreInputManager = true;
63     }
64 }
65 
setArgs(std::string engine_args)66 Status DefaultEngine::setArgs(std::string engine_args) {
67     auto pos = engine_args.find(kNoInputManager);
68     if (pos != std::string::npos) {
69         mIgnoreInputManager = true;
70     }
71     pos = engine_args.find(kDisplayStreamId);
72     if (pos == std::string::npos) {
73         return Status::SUCCESS;
74     }
75     mDisplayStream = std::stoi(engine_args.substr(pos + strlen(kDisplayStreamId)));
76     mConfigBuilder.setDebugDisplayStream(mDisplayStream);
77     mDebugDisplayManager = std::make_unique<debug_display_manager::EvsDisplayManager>();
78     mDebugDisplayManager->setArgs(engine_args);
79     return Status::SUCCESS;
80 }
81 
activate()82 Status DefaultEngine::activate() {
83     mConfigBuilder.reset();
84     mEngineThread = std::make_unique<std::thread>(&DefaultEngine::processCommands, this);
85     return mClient->activate();
86 }
87 
processClientConfigUpdate(const proto::ConfigurationCommand & command)88 Status DefaultEngine::processClientConfigUpdate(const proto::ConfigurationCommand& command) {
89     // TODO check current phase
90     std::lock_guard<std::mutex> lock(mEngineLock);
91     if (mCurrentPhase != kResetPhase) {
92         return Status::ILLEGAL_STATE;
93     }
94     if (command.has_set_input_source()) {
95         mConfigBuilder =
96             mConfigBuilder.updateInputConfigOption(command.set_input_source().source_id());
97     } else if (command.has_set_termination_option()) {
98         mConfigBuilder = mConfigBuilder.updateTerminationOption(
99             command.set_termination_option().termination_option_id());
100     } else if (command.has_set_output_stream()) {
101         mConfigBuilder = mConfigBuilder.updateOutputStreamOption(
102             command.set_output_stream().stream_id(),
103             command.set_output_stream().max_inflight_packets_count());
104     } else if (command.has_set_offload_offload()) {
105         mConfigBuilder =
106             mConfigBuilder.updateOffloadOption(command.set_offload_offload().offload_option_id());
107     } else if (command.has_set_profile_options()) {
108         mConfigBuilder =
109             mConfigBuilder.updateProfilingType(command.set_profile_options().profile_type());
110     } else {
111         return SUCCESS;
112     }
113     return Status::SUCCESS;
114 }
115 
processClientCommand(const proto::ControlCommand & command)116 Status DefaultEngine::processClientCommand(const proto::ControlCommand& command) {
117     // TODO check current phase
118     std::lock_guard<std::mutex> lock(mEngineLock);
119 
120     if (command.has_apply_configs()) {
121         if (mCurrentPhase != kResetPhase) {
122             return Status::ILLEGAL_STATE;
123         }
124         queueCommand("ClientInterface", EngineCommand::Type::BROADCAST_CONFIG);
125         return Status::SUCCESS;
126     }
127     if (command.has_start_graph()) {
128         if (mCurrentPhase != kConfigPhase) {
129             return Status::ILLEGAL_STATE;
130         }
131         queueCommand("ClientInterface", EngineCommand::Type::BROADCAST_START_RUN);
132         return Status::SUCCESS;
133     }
134     if (command.has_stop_graph()) {
135         if (mCurrentPhase != kRunPhase) {
136             return Status::ILLEGAL_STATE;
137         }
138         mStopFromClient = true;
139         queueCommand("ClientInterface", EngineCommand::Type::BROADCAST_INITIATE_STOP);
140         return Status::SUCCESS;
141     }
142     if (command.has_death_notification()) {
143         if (mCurrentPhase == kResetPhase) {
144             /**
145              * The runner is already in reset state, no need to broadcast client death
146              * to components
147              */
148             LOG(INFO) << "client death notification with no configuration";
149             return Status::SUCCESS;
150         }
151         mCurrentPhaseError = std::make_unique<ComponentError>("ClientInterface", "Client death",
152                                                               mCurrentPhase, false);
153         mWakeLooper.notify_all();
154         return Status::SUCCESS;
155     }
156     if (command.has_reset_configs()) {
157         if (mCurrentPhase != kConfigPhase) {
158             return Status::ILLEGAL_STATE;
159         }
160         queueCommand("ClientInterface", EngineCommand::Type::RESET_CONFIG);
161         return Status::SUCCESS;
162     }
163     if (command.has_start_pipe_profile()) {
164         if (mCurrentPhase != kRunPhase) {
165             return Status::ILLEGAL_STATE;
166         }
167         if (mGraph) {
168             return mGraph->StartGraphProfiling();
169         }
170         return Status::SUCCESS;
171     }
172     if (command.has_stop_pipe_profile()) {
173         if (mCurrentPhase != kRunPhase) {
174             return Status::SUCCESS;
175         }
176         if (mGraph) {
177             return mGraph->StopGraphProfiling();
178         }
179         return Status::SUCCESS;
180     }
181     if (command.has_release_debugger()) {
182         if (mCurrentPhase != kConfigPhase && mCurrentPhase != kResetPhase) {
183             return Status::ILLEGAL_STATE;
184         }
185         queueCommand("ClientInterface", EngineCommand::Type::RELEASE_DEBUGGER);
186     }
187     if (command.has_read_debug_data()) {
188         queueCommand("ClientInterface", EngineCommand::Type::READ_PROFILING);
189         return Status::SUCCESS;
190     }
191     return Status::SUCCESS;
192 }
193 
freePacket(int bufferId,int streamId)194 Status DefaultEngine::freePacket(int bufferId, int streamId) {
195     if (mStreamManagers.find(streamId) == mStreamManagers.end()) {
196         LOG(ERROR)
197             << "Unable to find the stream manager corresponding to the id for freeing the packet.";
198         return Status::INVALID_ARGUMENT;
199     }
200     return mStreamManagers[streamId]->freePacket(bufferId);
201 }
202 
203 /**
204  * Methods from PrebuiltEngineInterface
205  */
DispatchPixelData(int streamId,int64_t timestamp,const InputFrame & frame)206 void DefaultEngine::DispatchPixelData(int streamId, int64_t timestamp, const InputFrame& frame) {
207     LOG(DEBUG) << "Engine::Received data for pixel stream  " << streamId << " with timestamp "
208               << timestamp;
209     if (mStreamManagers.find(streamId) == mStreamManagers.end()) {
210         LOG(ERROR) << "Engine::Received bad stream id from prebuilt graph";
211         return;
212     }
213     mStreamManagers[streamId]->queuePacket(frame, timestamp);
214 }
215 
DispatchSerializedData(int streamId,int64_t timestamp,std::string && output)216 void DefaultEngine::DispatchSerializedData(int streamId, int64_t timestamp, std::string&& output) {
217     LOG(DEBUG) << "Engine::Received data for stream  " << streamId << " with timestamp "
218             << timestamp;
219     if (mStreamManagers.find(streamId) == mStreamManagers.end()) {
220         LOG(ERROR) << "Engine::Received bad stream id from prebuilt graph";
221         return;
222     }
223     std::string data(output);
224     mStreamManagers[streamId]->queuePacket(data.c_str(), data.size(), timestamp);
225 }
226 
DispatchGraphTerminationMessage(Status s,std::string && msg)227 void DefaultEngine::DispatchGraphTerminationMessage(Status s, std::string&& msg) {
228     std::lock_guard<std::mutex> lock(mEngineLock);
229     if (s == SUCCESS) {
230         if (mCurrentPhase == kRunPhase) {
231             queueCommand("PrebuiltGraph", EngineCommand::Type::BROADCAST_INITIATE_STOP);
232         } else {
233             LOG(WARNING) << "Graph termination when not in run phase";
234         }
235     } else {
236         std::string error = msg;
237         queueError("PrebuiltGraph", error, false);
238     }
239 }
240 
broadcastClientConfig()241 Status DefaultEngine::broadcastClientConfig() {
242     ClientConfig config = mConfigBuilder.emitClientOptions();
243 
244     LOG(INFO) << "Engine::create stream manager";
245     Status ret = populateStreamManagers(config);
246     if (ret != Status::SUCCESS) {
247         return ret;
248     }
249 
250     if (mGraph) {
251         ret = populateInputManagers(config);
252         if (ret != Status::SUCCESS) {
253             abortClientConfig(config);
254             return ret;
255         }
256 
257         LOG(INFO) << "Engine::send client config entry to graph";
258         config.setPhaseState(PhaseState::ENTRY);
259         ret = mGraph->handleConfigPhase(config);
260         if (ret != Status::SUCCESS) {
261             abortClientConfig(config);
262             return ret;
263         }
264         LOG(INFO) << "Engine::send client config transition complete to graph";
265         config.setPhaseState(PhaseState::TRANSITION_COMPLETE);
266         ret = mGraph->handleConfigPhase(config);
267         if (ret != Status::SUCCESS) {
268             abortClientConfig(config);
269             return ret;
270         }
271     }
272     LOG(INFO) << "Engine::Graph configured";
273     // TODO add handling for remote graph
274     if (mDebugDisplayManager) {
275         mDebugDisplayManager->setFreePacketCallback(std::bind(
276                 &DefaultEngine::freePacket, this, std::placeholders::_1, mDisplayStream));
277 
278         ret = mDebugDisplayManager->handleConfigPhase(config);
279         if (ret != Status::SUCCESS) {
280             config.setPhaseState(PhaseState::ABORTED);
281             abortClientConfig(config, true);
282             return ret;
283         }
284     }
285 
286     ret = mClient->handleConfigPhase(config);
287     if (ret != Status::SUCCESS) {
288         config.setPhaseState(PhaseState::ABORTED);
289         abortClientConfig(config, true);
290         return ret;
291     }
292 
293     mCurrentPhase = kConfigPhase;
294     return Status::SUCCESS;
295 }
296 
abortClientConfig(const ClientConfig & config,bool resetGraph)297 void DefaultEngine::abortClientConfig(const ClientConfig& config, bool resetGraph) {
298     mStreamManagers.clear();
299     mInputManagers.clear();
300     if (resetGraph && mGraph) {
301         (void)mGraph->handleConfigPhase(config);
302     }
303     (void)mClient->handleConfigPhase(config);
304     // TODO add handling for remote graph
305 }
306 
broadcastStartRun()307 Status DefaultEngine::broadcastStartRun() {
308     DefaultEvent runEvent = DefaultEvent::generateEntryEvent(DefaultEvent::RUN);
309 
310     std::vector<int> successfulStreams;
311     std::vector<int> successfulInputs;
312     for (auto& it : mStreamManagers) {
313         if (it.second->handleExecutionPhase(runEvent) != Status::SUCCESS) {
314             LOG(ERROR) << "Engine::failure to enter run phase for stream " << it.first;
315             broadcastAbortRun(successfulStreams, successfulInputs);
316             return Status::INTERNAL_ERROR;
317         }
318         successfulStreams.push_back(it.first);
319     }
320     // TODO: send to remote
321     if (mDebugDisplayManager) {
322         (void)mDebugDisplayManager->handleExecutionPhase(runEvent);
323     }
324 
325     Status ret;
326     if (mGraph) {
327         LOG(INFO) << "Engine::sending start run to prebuilt";
328         ret = mGraph->handleExecutionPhase(runEvent);
329         if (ret != Status::SUCCESS) {
330             broadcastAbortRun(successfulStreams, successfulInputs);
331         }
332         for (auto& it : mInputManagers) {
333             if (it.second->handleExecutionPhase(runEvent) != Status::SUCCESS) {
334                 LOG(ERROR) << "Engine::failure to enter run phase for input manager " << it.first;
335                 broadcastAbortRun(successfulStreams, successfulInputs, true);
336                 return Status::INTERNAL_ERROR;
337             }
338             successfulInputs.push_back(it.first);
339         }
340     }
341 
342     runEvent = DefaultEvent::generateTransitionCompleteEvent(DefaultEvent::RUN);
343     LOG(INFO) << "Engine::sending run transition complete to client";
344     ret = mClient->handleExecutionPhase(runEvent);
345     if (ret != Status::SUCCESS) {
346         LOG(ERROR) << "Engine::client failure to acknowledge transition to run complete ";
347         broadcastAbortRun(successfulStreams, successfulInputs, true);
348         return ret;
349     }
350     for (auto& it : mStreamManagers) {
351         (void)it.second->handleExecutionPhase(runEvent);
352     }
353     // TODO: send to remote
354     if (mDebugDisplayManager) {
355         (void)mDebugDisplayManager->handleExecutionPhase(runEvent);
356     }
357 
358     if (mGraph) {
359         LOG(INFO) << "Engine::sending run transition complete to prebuilt";
360         (void)mGraph->handleExecutionPhase(runEvent);
361         for (auto& it : mInputManagers) {
362             (void)it.second->handleExecutionPhase(runEvent);
363         }
364     }
365 
366     LOG(INFO) << "Engine::Running";
367     mCurrentPhase = kRunPhase;
368     return Status::SUCCESS;
369 }
370 
broadcastAbortRun(const std::vector<int> & streamIds,const std::vector<int> & inputIds,bool abortGraph)371 void DefaultEngine::broadcastAbortRun(const std::vector<int>& streamIds,
372                                       const std::vector<int>& inputIds, bool abortGraph) {
373     DefaultEvent runEvent = DefaultEvent::generateAbortEvent(DefaultEvent::RUN);
374     if (mDebugDisplayManager) {
375         (void)mDebugDisplayManager->handleExecutionPhase(runEvent);
376     }
377     std::for_each(streamIds.begin(), streamIds.end(), [this, runEvent](int id) {
378         (void)this->mStreamManagers[id]->handleExecutionPhase(runEvent);
379     });
380     std::for_each(inputIds.begin(), inputIds.end(), [this, runEvent](int id) {
381         (void)this->mInputManagers[id]->handleExecutionPhase(runEvent);
382     });
383     if (abortGraph) {
384         if (mGraph) {
385             (void)mGraph->handleExecutionPhase(runEvent);
386         }
387     }
388     (void)mClient->handleExecutionPhase(runEvent);
389 }
390 
broadcastStopWithFlush()391 Status DefaultEngine::broadcastStopWithFlush() {
392     DefaultEvent runEvent = DefaultEvent::generateEntryEvent(DefaultEvent::STOP_WITH_FLUSH);
393     if (mDebugDisplayManager) {
394         (void)mDebugDisplayManager->handleStopWithFlushPhase(runEvent);
395     }
396 
397     if (mGraph) {
398         for (auto& it : mInputManagers) {
399             (void)it.second->handleStopWithFlushPhase(runEvent);
400         }
401         if (mStopFromClient) {
402             (void)mGraph->handleStopWithFlushPhase(runEvent);
403         }
404     }
405     // TODO: send to remote.
406     for (auto& it : mStreamManagers) {
407         (void)it.second->handleStopWithFlushPhase(runEvent);
408     }
409     if (!mStopFromClient) {
410         (void)mClient->handleStopWithFlushPhase(runEvent);
411     }
412     mCurrentPhase = kStopPhase;
413     return Status::SUCCESS;
414 }
415 
broadcastStopComplete()416 Status DefaultEngine::broadcastStopComplete() {
417     DefaultEvent runEvent =
418         DefaultEvent::generateTransitionCompleteEvent(DefaultEvent::STOP_WITH_FLUSH);
419     if (mGraph) {
420         for (auto& it : mInputManagers) {
421             (void)it.second->handleStopWithFlushPhase(runEvent);
422         }
423         (void)mGraph->handleStopWithFlushPhase(runEvent);
424     }
425     if (mDebugDisplayManager) {
426         (void)mDebugDisplayManager->handleStopWithFlushPhase(runEvent);
427     }
428     // TODO: send to remote.
429     for (auto& it : mStreamManagers) {
430         (void)it.second->handleStopWithFlushPhase(runEvent);
431     }
432     (void)mClient->handleStopWithFlushPhase(runEvent);
433     mCurrentPhase = kConfigPhase;
434     return Status::SUCCESS;
435 }
436 
broadcastHalt()437 void DefaultEngine::broadcastHalt() {
438     DefaultEvent stopEvent = DefaultEvent::generateEntryEvent(DefaultEvent::STOP_IMMEDIATE);
439 
440     if (mGraph) {
441         for (auto& it : mInputManagers) {
442             (void)it.second->handleStopImmediatePhase(stopEvent);
443         }
444 
445         if ((mCurrentPhaseError->source.find("PrebuiltGraph") == std::string::npos)) {
446             (void)mGraph->handleStopImmediatePhase(stopEvent);
447         }
448     }
449     if (mDebugDisplayManager) {
450         (void)mDebugDisplayManager->handleStopImmediatePhase(stopEvent);
451     }
452     // TODO: send to remote if client was source.
453     for (auto& it : mStreamManagers) {
454         (void)it.second->handleStopImmediatePhase(stopEvent);
455     }
456     if (mCurrentPhaseError->source.find("ClientInterface") == std::string::npos) {
457         (void)mClient->handleStopImmediatePhase(stopEvent);
458     }
459 
460     stopEvent = DefaultEvent::generateTransitionCompleteEvent(DefaultEvent::STOP_IMMEDIATE);
461     if (mGraph) {
462         for (auto& it : mInputManagers) {
463             (void)it.second->handleStopImmediatePhase(stopEvent);
464         }
465         // TODO: send to graph or remote if client was source.
466 
467         if ((mCurrentPhaseError->source.find("PrebuiltGraph") == std::string::npos) && mGraph) {
468             (void)mGraph->handleStopImmediatePhase(stopEvent);
469         }
470     }
471     if (mDebugDisplayManager) {
472         (void)mDebugDisplayManager->handleStopImmediatePhase(stopEvent);
473     }
474     for (auto& it : mStreamManagers) {
475         (void)it.second->handleStopImmediatePhase(stopEvent);
476     }
477     if (mCurrentPhaseError->source.find("ClientInterface") == std::string::npos) {
478         (void)mClient->handleStopImmediatePhase(stopEvent);
479     }
480     mCurrentPhase = kConfigPhase;
481 }
482 
broadcastReset()483 void DefaultEngine::broadcastReset() {
484     mStreamManagers.clear();
485     mInputManagers.clear();
486     DefaultEvent resetEvent = DefaultEvent::generateEntryEvent(DefaultEvent::RESET);
487     (void)mClient->handleResetPhase(resetEvent);
488     if (mGraph) {
489         (void)mGraph->handleResetPhase(resetEvent);
490     }
491     resetEvent = DefaultEvent::generateTransitionCompleteEvent(DefaultEvent::RESET);
492     (void)mClient->handleResetPhase(resetEvent);
493     if (mGraph) {
494         (void)mGraph->handleResetPhase(resetEvent);
495     }
496     if (mDebugDisplayManager) {
497         (void)mDebugDisplayManager->handleResetPhase(resetEvent);
498     }
499     // TODO: send to remote runner
500     mConfigBuilder.reset();
501     mCurrentPhase = kResetPhase;
502     mStopFromClient = false;
503 }
504 
populateStreamManagers(const ClientConfig & config)505 Status DefaultEngine::populateStreamManagers(const ClientConfig& config) {
506     std::map<int, int> outputConfigs;
507     if (config.getOutputStreamConfigs(outputConfigs) != Status::SUCCESS) {
508         return Status::ILLEGAL_STATE;
509     }
510     for (auto& configIt : outputConfigs) {
511         int streamId = configIt.first;
512         int maxInFlightPackets = configIt.second;
513         proto::OutputConfig outputDescriptor;
514         // find the output descriptor for requested stream id
515         bool foundDesc = false;
516         for (auto& optionIt : mGraphDescriptor.output_configs()) {
517             if (optionIt.stream_id() == streamId) {
518                 outputDescriptor = optionIt;
519                 foundDesc = true;
520                 break;
521             }
522         }
523         if (!foundDesc) {
524             LOG(ERROR) << "no matching output config for requested id " << streamId;
525             return Status::INVALID_ARGUMENT;
526         }
527         std::function<Status(std::shared_ptr<MemHandle>)> packetCb =
528             [this, streamId](std::shared_ptr<MemHandle> handle) -> Status {
529             return this->forwardOutputDataToClient(streamId, handle);
530         };
531 
532         std::function<void(std::string)> errorCb = [this, streamId](std::string m) {
533             std::string source = "StreamManager:" + std::to_string(streamId) + " : " + m;
534             this->queueError(source, m, false);
535         };
536 
537         std::function<void()> eos = [this, streamId]() {
538             std::string source = "StreamManager:" + std::to_string(streamId);
539             std::lock_guard<std::mutex> lock(this->mEngineLock);
540             this->queueCommand(source, EngineCommand::Type::POLL_COMPLETE);
541         };
542 
543         std::shared_ptr<StreamEngineInterface> engine = std::make_shared<StreamCallback>(
544             std::move(eos), std::move(errorCb), std::move(packetCb));
545         mStreamManagers.emplace(configIt.first, mStreamFactory.getStreamManager(
546                                                     outputDescriptor, engine, maxInFlightPackets));
547         if (mStreamManagers[streamId] == nullptr) {
548             LOG(ERROR) << "unable to create stream manager for stream " << streamId;
549             return Status::INTERNAL_ERROR;
550         }
551     }
552     return Status::SUCCESS;
553 }
554 
forwardOutputDataToClient(int streamId,std::shared_ptr<MemHandle> & dataHandle)555 Status DefaultEngine::forwardOutputDataToClient(int streamId,
556                                                 std::shared_ptr<MemHandle>& dataHandle) {
557     if (streamId != mDisplayStream) {
558         return mClient->dispatchPacketToClient(streamId, dataHandle);
559     }
560 
561     auto displayMgrPacket = dataHandle;
562     if (mConfigBuilder.clientConfigEnablesDisplayStream()) {
563         if (mStreamManagers.find(streamId) == mStreamManagers.end()) {
564             displayMgrPacket = nullptr;
565         } else {
566             displayMgrPacket = mStreamManagers[streamId]->clonePacket(dataHandle);
567         }
568         Status status = mClient->dispatchPacketToClient(streamId, dataHandle);
569         if (status != Status::SUCCESS) {
570             return status;
571         }
572     }
573     CHECK(mDebugDisplayManager);
574     return mDebugDisplayManager->displayFrame(dataHandle);
575 }
576 
populateInputManagers(const ClientConfig & config)577 Status DefaultEngine::populateInputManagers(const ClientConfig& config) {
578     if (mIgnoreInputManager) {
579         return Status::SUCCESS;
580     }
581 
582     proto::InputConfig inputDescriptor;
583     int selectedId;
584 
585     if (config.getInputConfigId(&selectedId) != Status::SUCCESS) {
586         return Status::INVALID_ARGUMENT;
587     }
588 
589     for (auto& inputIt : mGraphDescriptor.input_configs()) {
590         if (selectedId == inputIt.config_id()) {
591             inputDescriptor = inputIt;
592             std::shared_ptr<InputCallback> cb = std::make_shared<InputCallback>(
593                 selectedId,
594                 [this](int id) {
595                     std::string source = "InputManager:" + std::to_string(id);
596                     this->queueError(source, "", false);
597                 },
598                 [this](int streamId, int64_t timestamp, const InputFrame& frame) {
599                     return this->mGraph->SetInputStreamPixelData(streamId, timestamp, frame);
600                 });
601             mInputManagers.emplace(selectedId,
602                                    mInputFactory.createInputManager(inputDescriptor, cb));
603             if (mInputManagers[selectedId] == nullptr) {
604                 LOG(ERROR) << "unable to create input manager for stream " << selectedId;
605                 // TODO: Add print
606                 return Status::INTERNAL_ERROR;
607             }
608             return Status::SUCCESS;
609         }
610     }
611     return Status::INVALID_ARGUMENT;
612 }
613 
614 /**
615  * Engine Command Queue and Error Queue handling
616  */
processCommands()617 void DefaultEngine::processCommands() {
618     std::unique_lock<std::mutex> lock(mEngineLock);
619     while (1) {
620         LOG(INFO) << "Engine::Waiting on commands ";
621         mWakeLooper.wait(lock, [this] {
622             if (this->mCommandQueue.empty() && !mCurrentPhaseError) {
623                 return false;
624             } else {
625                 return true;
626             }
627         });
628         if (mCurrentPhaseError) {
629             mErrorQueue.push(*mCurrentPhaseError);
630 
631             processComponentError(mCurrentPhaseError->source);
632             mCurrentPhaseError = nullptr;
633             std::queue<EngineCommand> empty;
634             std::swap(mCommandQueue, empty);
635             continue;
636         }
637         EngineCommand ec = mCommandQueue.front();
638         mCommandQueue.pop();
639         switch (ec.cmdType) {
640             case EngineCommand::Type::BROADCAST_CONFIG:
641                 LOG(INFO) << "Engine::Received broadcast config request";
642                 (void)broadcastClientConfig();
643                 break;
644             case EngineCommand::Type::BROADCAST_START_RUN:
645                 LOG(INFO) << "Engine::Received broadcast run request";
646                 (void)broadcastStartRun();
647                 break;
648             case EngineCommand::Type::BROADCAST_INITIATE_STOP:
649                 if (ec.source.find("ClientInterface") != std::string::npos) {
650                     mStopFromClient = true;
651                 }
652                 LOG(INFO) << "Engine::Received broadcast stop with flush request";
653                 broadcastStopWithFlush();
654                 break;
655             case EngineCommand::Type::POLL_COMPLETE:
656                 LOG(INFO) << "Engine::Received Poll stream managers for completion request";
657                 {
658                     int id = getStreamIdFromSource(ec.source);
659                     bool all_done = true;
660                     for (auto& it : mStreamManagers) {
661                         if (it.first == id) {
662                             continue;
663                         }
664                         if (it.second->getState() != StreamManager::State::STOPPED) {
665                             all_done = false;
666                         }
667                     }
668                     if (all_done) {
669                         broadcastStopComplete();
670                     }
671                 }
672                 break;
673             case EngineCommand::Type::RESET_CONFIG:
674                 (void)broadcastReset();
675                 break;
676             case EngineCommand::Type::RELEASE_DEBUGGER:
677                 {
678                     // broadcastReset() resets the previous copy, so save a copy of the old config.
679                     ConfigBuilder previousConfig = mConfigBuilder;
680                     (void)broadcastReset();
681                     mConfigBuilder =
682                             previousConfig.updateProfilingType(proto::ProfilingType::DISABLED);
683                     (void)broadcastClientConfig();
684                 }
685                 break;
686             case EngineCommand::Type::READ_PROFILING:
687                 std::string debugData;
688                 if (mGraph && (mCurrentPhase == kConfigPhase || mCurrentPhase == kRunPhase
689                                 || mCurrentPhase == kStopPhase)) {
690                     debugData = mGraph->GetDebugInfo();
691                 }
692                 if (mClient) {
693                     Status status = mClient->deliverGraphDebugInfo(debugData);
694                     if (status != Status::SUCCESS) {
695                         LOG(ERROR) << "Failed to deliver graph debug info to client.";
696                     }
697                 }
698                 break;
699         }
700     }
701 }
702 
processComponentError(std::string source)703 void DefaultEngine::processComponentError(std::string source) {
704     if (mCurrentPhase == kRunPhase || mCurrentPhase == kStopPhase) {
705         (void)broadcastHalt();
706     }
707     if (source.find("ClientInterface") != std::string::npos) {
708         (void)broadcastReset();
709     }
710 }
711 
queueCommand(std::string source,EngineCommand::Type type)712 void DefaultEngine::queueCommand(std::string source, EngineCommand::Type type) {
713     mCommandQueue.push(EngineCommand(source, type));
714     mWakeLooper.notify_all();
715 }
716 
queueError(std::string source,std::string msg,bool fatal)717 void DefaultEngine::queueError(std::string source, std::string msg, bool fatal) {
718     std::lock_guard<std::mutex> lock(mEngineLock);
719     // current phase already has an error report
720     if (!mCurrentPhaseError) {
721         mCurrentPhaseError = std::make_unique<ComponentError>(source, msg, mCurrentPhase, fatal);
722         mWakeLooper.notify_all();
723     }
724 }
725 
726 /**
727  * InputCallback implementation
728  */
InputCallback(int id,const std::function<void (int)> && cb,const std::function<Status (int,int64_t timestamp,const InputFrame &)> && packetCb)729 InputCallback::InputCallback(
730     int id, const std::function<void(int)>&& cb,
731     const std::function<Status(int, int64_t timestamp, const InputFrame&)>&& packetCb)
732     : mErrorCallback(cb), mPacketHandler(packetCb), mInputId(id) {
733 }
734 
dispatchInputFrame(int streamId,int64_t timestamp,const InputFrame & frame)735 Status InputCallback::dispatchInputFrame(int streamId, int64_t timestamp, const InputFrame& frame) {
736     return mPacketHandler(streamId, timestamp, frame);
737 }
738 
notifyInputError()739 void InputCallback::notifyInputError() {
740     mErrorCallback(mInputId);
741 }
742 
743 /**
744  * StreamCallback implementation
745  */
StreamCallback(const std::function<void ()> && eos,const std::function<void (std::string)> && errorCb,const std::function<Status (const std::shared_ptr<MemHandle> &)> && packetHandler)746 StreamCallback::StreamCallback(
747     const std::function<void()>&& eos, const std::function<void(std::string)>&& errorCb,
748     const std::function<Status(const std::shared_ptr<MemHandle>&)>&& packetHandler)
749     : mErrorHandler(errorCb), mEndOfStreamHandler(eos), mPacketHandler(packetHandler) {
750 }
751 
notifyError(std::string msg)752 void StreamCallback::notifyError(std::string msg) {
753     mErrorHandler(msg);
754 }
755 
notifyEndOfStream()756 void StreamCallback::notifyEndOfStream() {
757     mEndOfStreamHandler();
758 }
759 
dispatchPacket(const std::shared_ptr<MemHandle> & packet)760 Status StreamCallback::dispatchPacket(const std::shared_ptr<MemHandle>& packet) {
761     return mPacketHandler(packet);
762 }
763 
764 }  // namespace engine
765 }  // namespace runner
766 }  // namespace computepipe
767 }  // namespace automotive
768 }  // namespace android
769