1 // Copyright (C) 2020 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //      http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef COMPUTEPIPE_RUNNER_ENGINE_DEFAULTENGINE_H_
16 #define COMPUTEPIPE_RUNNER_ENGINE_DEFAULTENGINE_H_
17 
18 #include <condition_variable>
19 #include <functional>
20 #include <mutex>
21 #include <queue>
22 #include <string>
23 #include <thread>
24 #include <vector>
25 
26 #include "ConfigBuilder.h"
27 #include "DebugDisplayManager.h"
28 #include "InputManager.h"
29 #include "Options.pb.h"
30 #include "RunnerEngine.h"
31 #include "StreamManager.h"
32 
33 namespace android {
34 namespace automotive {
35 namespace computepipe {
36 namespace runner {
37 namespace engine {
38 
39 class InputCallback;
40 
41 /**
42  * EngineCommand represents client requests or error events.
43  * Each command is queued, and processed by the engine thread.
44  */
45 struct EngineCommand {
46   public:
47     enum Type {
48         BROADCAST_CONFIG = 0,
49         BROADCAST_START_RUN,
50         BROADCAST_INITIATE_STOP,
51         POLL_COMPLETE,
52         RESET_CONFIG,
53         RELEASE_DEBUGGER,
54         READ_PROFILING,
55     };
56     std::string source;
57     Type cmdType;
EngineCommandEngineCommand58     explicit EngineCommand(std::string s, Type t) : source(s), cmdType(t) {
59     }
60 };
61 
62 /**
63  * Component Error represents the type of error reported by a component.
64  */
65 struct ComponentError {
66   public:
67     bool isFatal;
68     std::string source;
69     std::string message;
70     std::string currentPhase;
71     explicit ComponentError(std::string s, std::string m, std::string p, bool fatal = false)
isFatalComponentError72         : isFatal(fatal), source(s), message(m), currentPhase(p) {
73     }
74 };
75 
76 /**
77  * Default Engine implementation.
78  * Takes ownership of externally instantiated graph & client interface
79  * instances. Brings the runner online. Manages components.
80  * Responds to client events.
81  */
82 class DefaultEngine : public RunnerEngine {
83   public:
84     static constexpr char kDisplayStreamId[] = "display_stream:";
85     static constexpr char kNoInputManager[] = "no_input_manager";
86     static constexpr char kResetPhase[] = "Reset";
87     static constexpr char kConfigPhase[] = "Config";
88     static constexpr char kRunPhase[] = "Running";
89     static constexpr char kStopPhase[] = "Stopping";
90     /**
91      * Methods from Runner Engine to override
92      */
93     Status setArgs(std::string engineArgs) override;
94     void setClientInterface(std::unique_ptr<client_interface::ClientInterface>&& client) override;
95     void setPrebuiltGraph(std::unique_ptr<graph::PrebuiltGraph>&& graph) override;
96     Status activate() override;
97     /**
98      * Methods from ClientEngineInterface to override
99      */
100     Status processClientConfigUpdate(const proto::ConfigurationCommand& command) override;
101     Status processClientCommand(const proto::ControlCommand& command) override;
102     Status freePacket(int bufferId, int streamId) override;
103     /**
104      * Methods from PrebuiltEngineInterface to override
105      */
106     void DispatchPixelData(int streamId, int64_t timestamp, const InputFrame& frame) override;
107 
108     void DispatchSerializedData(int streamId, int64_t timestamp, std::string&& output) override;
109 
110     void DispatchGraphTerminationMessage(Status s, std::string&& msg) override;
111 
112   private:
113     // TODO: b/147704051 Add thread analyzer annotations
114     /**
115      * BroadCast Client config to all components. If all components handle the
116      * notification correctly, then broadcast transition complete.
117      * Successful return from this function implies runner has transitioned to
118      * configuration done.
119      * @Lock held mEngineLock
120      */
121     Status broadcastClientConfig();
122     /**
123      * Abort an ongoing attempt to apply client configs.
124      * @Lock held mEngineLock
125      */
126     void abortClientConfig(const ClientConfig& config, bool resetGraph = false);
127     /**
128      * BroadCast start to all components. The order of entry into run phase
129      * notification delivery is downstream components to upstream components.
130      * If all components handle the entry notification correctly then broadcast
131      * transition complete notification again from down stream to upstream.
132      * Successful return from this function implies runner has transitioned to
133      * running.
134      * @Lock held mEngineLock
135      */
136     Status broadcastStartRun();
137     /**
138      * BroadCast abort of started run to given components. This gets called if during
139      * broadcastStartRun(), one of the components failed to set itself up for the run. In that case
140      * the components that had successfully acknowledged broacastStartRun(),
141      * need to be told to abort. We transition back to config phase at the end
142      * of this call.
143      * @Lock held mEngineLock
144      */
145     void broadcastAbortRun(const std::vector<int>& streamIds, const std::vector<int>& inputIds,
146                            bool graph = false);
147 
148     /**
149      * Broadcast stop with flush to all components. The stop with flush phase
150      * entry notification is sent from in the order of upstream to downstream.
151      * A successful return can leave the runner in stopping phase.
152      * We transition to stop completely, once all inflight traffic has been drained at a later
153      * point, identified by stream managers.
154      * @Lock held mEngineLock
155      */
156     Status broadcastStopWithFlush();
157     /**
158      * Broadcast transtion to stop complete. This is a confirmation to all
159      * components that stop has finished. At the end of this we transition back
160      * to config phase.
161      * @Lock held mEngineLock
162      */
163     Status broadcastStopComplete();
164     /**
165      * Broadcast halt to all components. All inflight traffic is dropped.
166      * Successful return from this function implies all components have
167      * exited run phase and are back in config phase.
168      * @Lock held mEngineLock
169      */
170     void broadcastHalt();
171     /**
172      * Broadcast reset to all components. All components drop client
173      * specific configuration and transition to reset state. For RAII
174      * components, they are freed at this point. ALso resets the mConfigBuilder
175      * to its original state. Successful return puts the runner in reset phase.
176      * @Lock held mEngineLock
177      */
178     void broadcastReset();
179     /**
180      * Populate stream managers for a given client config. For each client
181      * selected output config, we generate stream managers. During reset phase
182      * we clear out any previously constructed stream managers. This should be
183      * invoked only in response to applyConfigs() issued by client.
184      * @Lock held mEngineLock
185      */
186     Status populateStreamManagers(const ClientConfig& config);
187     /**
188      * Populate input managers for a given client config. For each client
189      * selected output config, we generate input managers. During reset phase
190      * we clear out any previously constructed input managers. This should be
191      * invoked only in response to applyConfigs() issued by client.
192      * @Lock held mEngineLock
193      */
194     Status populateInputManagers(const ClientConfig& config);
195     /**
196      * Helper method to forward packet to client interface for transmission
197      */
198     Status forwardOutputDataToClient(int streamId, std::shared_ptr<MemHandle>& handle);
199     /**
200      * Helper to handle error notification from components, in the errorQueue.
201      * In case the source of the error is client interface, it will
202      * broadcastReset().
203      * This called in the mEngineThread when processing an entry from the
204      * errorQueue,
205      * @Lock acquires mEngineLock.
206      */
207     void processComponentError(std::string source);
208     /**
209      * Method run by the engine thread to process commands.
210      * Uses condition variable. Acquires lock mEngineLock to access
211      * command queues.
212      */
213     void processCommands();
214     /**
215      * Method run by external components to queue commands to the engine.
216      * Must be called with mEngineLock held. Wakes up the looper.
217      */
218     void queueCommand(std::string source, EngineCommand::Type type);
219     /**
220      * Method called by component reporting error.
221      * This will acquire mEngineLock and queue the error.
222      */
223     void queueError(std::string source, std::string msg, bool fatal);
224     /**
225      * client interface handle
226      */
227     std::unique_ptr<client_interface::ClientInterface> mClient = nullptr;
228     /**
229      * builder to build up client config incrementally.
230      */
231     ConfigBuilder mConfigBuilder;
232     /**
233      * Stream management members
234      */
235     std::map<int, std::unique_ptr<stream_manager::StreamManager>> mStreamManagers;
236     stream_manager::StreamManagerFactory mStreamFactory;
237     /**
238      * Input manager members
239      */
240     std::map<int, std::unique_ptr<input_manager::InputManager>> mInputManagers;
241     input_manager::InputManagerFactory mInputFactory;
242     /**
243      * stream to dump to display for debug purposes
244      */
245     int32_t mDisplayStream = ClientConfig::kInvalidId;
246     /**
247      * Debug display manager.
248      */
249     std::unique_ptr<debug_display_manager::DebugDisplayManager> mDebugDisplayManager = nullptr;
250     /**
251      * graph descriptor
252      */
253     proto::Options mGraphDescriptor;
254     std::unique_ptr<graph::PrebuiltGraph> mGraph;
255     /**
256      * stop signal source
257      */
258     bool mStopFromClient = true;
259     /**
260      * Phase management members
261      */
262     std::string mCurrentPhase = kResetPhase;
263     std::mutex mEngineLock;
264     /**
265      * Used to track the first error occurrence for a given phase.
266      */
267     std::unique_ptr<ComponentError> mCurrentPhaseError = nullptr;
268 
269     /**
270      * Queue for client commands
271      */
272     std::queue<EngineCommand> mCommandQueue;
273     /**
274      * Queue for error notifications
275      */
276     std::queue<ComponentError> mErrorQueue;
277     /**
278      * Engine looper
279      */
280     std::unique_ptr<std::thread> mEngineThread;
281     /**
282      * Condition variable for looper
283      */
284     std::condition_variable mWakeLooper;
285     /**
286      * ignore input manager allocation
287      */
288     bool mIgnoreInputManager = false;
289     /**
290      * Arguments set by setArgs().
291      */
292     std::string mEngineArgs;
293 };
294 
295 /**
296  * Handles callbacks from individual stream managers as specified in the
297  * StreamEngineInterface.
298  */
299 class StreamCallback : public stream_manager::StreamEngineInterface {
300   public:
301     explicit StreamCallback(
302         const std::function<void()>&& eos, const std::function<void(std::string)>&& errorCb,
303         const std::function<Status(const std::shared_ptr<MemHandle>&)>&& packetHandler);
304     void notifyEndOfStream() override;
305     void notifyError(std::string msg) override;
306     Status dispatchPacket(const std::shared_ptr<MemHandle>& outData) override;
307     ~StreamCallback() = default;
308 
309   private:
310     std::function<void(std::string)> mErrorHandler;
311     std::function<void()> mEndOfStreamHandler;
312     std::function<Status(const std::shared_ptr<MemHandle>&)> mPacketHandler;
313 };
314 
315 /**
316  * Handles callbacks from input managers. Forwards frames to the graph.
317  * Only used if graph implementation is local
318  */
319 class InputCallback : public input_manager::InputEngineInterface {
320   public:
321     explicit InputCallback(int id, const std::function<void(int)>&& cb,
322                            const std::function<Status(int, int64_t, const InputFrame&)>&& packetCb);
323     Status dispatchInputFrame(int streamId, int64_t timestamp, const InputFrame& frame) override;
324     void notifyInputError() override;
325     ~InputCallback() = default;
326 
327   private:
328     std::function<void(int)> mErrorCallback;
329     std::function<Status(int, int64_t, const InputFrame&)> mPacketHandler;
330     int mInputId;
331 };
332 
333 }  // namespace engine
334 }  // namespace runner
335 }  // namespace computepipe
336 }  // namespace automotive
337 }  // namespace android
338 
339 #endif  // COMPUTEPIPE_RUNNER_ENGINE_DEFAULTENGINE_H_
340