1 // Copyright (C) 2020 The Android Open Source Project 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #ifndef COMPUTEPIPE_RUNNER_ENGINE_DEFAULTENGINE_H_ 16 #define COMPUTEPIPE_RUNNER_ENGINE_DEFAULTENGINE_H_ 17 18 #include <condition_variable> 19 #include <functional> 20 #include <mutex> 21 #include <queue> 22 #include <string> 23 #include <thread> 24 #include <vector> 25 26 #include "ConfigBuilder.h" 27 #include "DebugDisplayManager.h" 28 #include "InputManager.h" 29 #include "Options.pb.h" 30 #include "RunnerEngine.h" 31 #include "StreamManager.h" 32 33 namespace android { 34 namespace automotive { 35 namespace computepipe { 36 namespace runner { 37 namespace engine { 38 39 class InputCallback; 40 41 /** 42 * EngineCommand represents client requests or error events. 43 * Each command is queued, and processed by the engine thread. 44 */ 45 struct EngineCommand { 46 public: 47 enum Type { 48 BROADCAST_CONFIG = 0, 49 BROADCAST_START_RUN, 50 BROADCAST_INITIATE_STOP, 51 POLL_COMPLETE, 52 RESET_CONFIG, 53 RELEASE_DEBUGGER, 54 READ_PROFILING, 55 }; 56 std::string source; 57 Type cmdType; EngineCommandEngineCommand58 explicit EngineCommand(std::string s, Type t) : source(s), cmdType(t) { 59 } 60 }; 61 62 /** 63 * Component Error represents the type of error reported by a component. 64 */ 65 struct ComponentError { 66 public: 67 bool isFatal; 68 std::string source; 69 std::string message; 70 std::string currentPhase; 71 explicit ComponentError(std::string s, std::string m, std::string p, bool fatal = false) isFatalComponentError72 : isFatal(fatal), source(s), message(m), currentPhase(p) { 73 } 74 }; 75 76 /** 77 * Default Engine implementation. 78 * Takes ownership of externally instantiated graph & client interface 79 * instances. Brings the runner online. Manages components. 80 * Responds to client events. 81 */ 82 class DefaultEngine : public RunnerEngine { 83 public: 84 static constexpr char kDisplayStreamId[] = "display_stream:"; 85 static constexpr char kNoInputManager[] = "no_input_manager"; 86 static constexpr char kResetPhase[] = "Reset"; 87 static constexpr char kConfigPhase[] = "Config"; 88 static constexpr char kRunPhase[] = "Running"; 89 static constexpr char kStopPhase[] = "Stopping"; 90 /** 91 * Methods from Runner Engine to override 92 */ 93 Status setArgs(std::string engine_args) override; 94 void setClientInterface(std::unique_ptr<client_interface::ClientInterface>&& client) override; 95 void setPrebuiltGraph(std::unique_ptr<graph::PrebuiltGraph>&& graph) override; 96 Status activate() override; 97 /** 98 * Methods from ClientEngineInterface to override 99 */ 100 Status processClientConfigUpdate(const proto::ConfigurationCommand& command) override; 101 Status processClientCommand(const proto::ControlCommand& command) override; 102 Status freePacket(int bufferId, int streamId) override; 103 /** 104 * Methods from PrebuiltEngineInterface to override 105 */ 106 void DispatchPixelData(int streamId, int64_t timestamp, const InputFrame& frame) override; 107 108 void DispatchSerializedData(int streamId, int64_t timestamp, std::string&& output) override; 109 110 void DispatchGraphTerminationMessage(Status s, std::string&& msg) override; 111 112 private: 113 // TODO: b/147704051 Add thread analyzer annotations 114 /** 115 * BroadCast Client config to all components. If all components handle the 116 * notification correctly, then broadcast transition complete. 117 * Successful return from this function implies runner has transitioned to 118 * configuration done. 119 * @Lock held mEngineLock 120 */ 121 Status broadcastClientConfig(); 122 /** 123 * Abort an ongoing attempt to apply client configs. 124 * @Lock held mEngineLock 125 */ 126 void abortClientConfig(const ClientConfig& config, bool resetGraph = false); 127 /** 128 * BroadCast start to all components. The order of entry into run phase 129 * notification delivery is downstream components to upstream components. 130 * If all components handle the entry notification correctly then broadcast 131 * transition complete notification again from down stream to upstream. 132 * Successful return from this function implies runner has transitioned to 133 * running. 134 * @Lock held mEngineLock 135 */ 136 Status broadcastStartRun(); 137 /** 138 * BroadCast abort of started run to given components. This gets called if during 139 * broadcastStartRun(), one of the components failed to set itself up for the run. In that case 140 * the components that had successfully acknowledged broacastStartRun(), 141 * need to be told to abort. We transition back to config phase at the end 142 * of this call. 143 * @Lock held mEngineLock 144 */ 145 void broadcastAbortRun(const std::vector<int>& streamIds, const std::vector<int>& inputIds, 146 bool graph = false); 147 148 /** 149 * Broadcast stop with flush to all components. The stop with flush phase 150 * entry notification is sent from in the order of upstream to downstream. 151 * A successful return can leave the runner in stopping phase. 152 * We transition to stop completely, once all inflight traffic has been drained at a later 153 * point, identified by stream managers. 154 * @Lock held mEngineLock 155 */ 156 Status broadcastStopWithFlush(); 157 /** 158 * Broadcast transtion to stop complete. This is a confirmation to all 159 * components that stop has finished. At the end of this we transition back 160 * to config phase. 161 * @Lock held mEngineLock 162 */ 163 Status broadcastStopComplete(); 164 /** 165 * Broadcast halt to all components. All inflight traffic is dropped. 166 * Successful return from this function implies all components have 167 * exited run phase and are back in config phase. 168 * @Lock held mEngineLock 169 */ 170 void broadcastHalt(); 171 /** 172 * Broadcast reset to all components. All components drop client 173 * specific configuration and transition to reset state. For RAII 174 * components, they are freed at this point. ALso resets the mConfigBuilder 175 * to its original state. Successful return puts the runner in reset phase. 176 * @Lock held mEngineLock 177 */ 178 void broadcastReset(); 179 /** 180 * Populate stream managers for a given client config. For each client 181 * selected output config, we generate stream managers. During reset phase 182 * we clear out any previously constructed stream managers. This should be 183 * invoked only in response to applyConfigs() issued by client. 184 * @Lock held mEngineLock 185 */ 186 Status populateStreamManagers(const ClientConfig& config); 187 /** 188 * Populate input managers for a given client config. For each client 189 * selected output config, we generate input managers. During reset phase 190 * we clear out any previously constructed input managers. This should be 191 * invoked only in response to applyConfigs() issued by client. 192 * @Lock held mEngineLock 193 */ 194 Status populateInputManagers(const ClientConfig& config); 195 /** 196 * Helper method to forward packet to client interface for transmission 197 */ 198 Status forwardOutputDataToClient(int streamId, std::shared_ptr<MemHandle>& handle); 199 /** 200 * Helper to handle error notification from components, in the errorQueue. 201 * In case the source of the error is client interface, it will 202 * broadcastReset(). 203 * This called in the mEngineThread when processing an entry from the 204 * errorQueue, 205 * @Lock acquires mEngineLock. 206 */ 207 void processComponentError(std::string source); 208 /** 209 * Method run by the engine thread to process commands. 210 * Uses condition variable. Acquires lock mEngineLock to access 211 * command queues. 212 */ 213 void processCommands(); 214 /** 215 * Method run by external components to queue commands to the engine. 216 * Must be called with mEngineLock held. Wakes up the looper. 217 */ 218 void queueCommand(std::string source, EngineCommand::Type type); 219 /** 220 * Method called by component reporting error. 221 * This will acquire mEngineLock and queue the error. 222 */ 223 void queueError(std::string source, std::string msg, bool fatal); 224 /** 225 * client interface handle 226 */ 227 std::unique_ptr<client_interface::ClientInterface> mClient = nullptr; 228 /** 229 * builder to build up client config incrementally. 230 */ 231 ConfigBuilder mConfigBuilder; 232 /** 233 * Stream management members 234 */ 235 std::map<int, std::unique_ptr<stream_manager::StreamManager>> mStreamManagers; 236 stream_manager::StreamManagerFactory mStreamFactory; 237 /** 238 * Input manager members 239 */ 240 std::map<int, std::unique_ptr<input_manager::InputManager>> mInputManagers; 241 input_manager::InputManagerFactory mInputFactory; 242 /** 243 * stream to dump to display for debug purposes 244 */ 245 int32_t mDisplayStream = ClientConfig::kInvalidId; 246 /** 247 * Debug display manager. 248 */ 249 std::unique_ptr<debug_display_manager::DebugDisplayManager> mDebugDisplayManager = nullptr; 250 /** 251 * graph descriptor 252 */ 253 proto::Options mGraphDescriptor; 254 std::unique_ptr<graph::PrebuiltGraph> mGraph; 255 /** 256 * stop signal source 257 */ 258 bool mStopFromClient = true; 259 /** 260 * Phase management members 261 */ 262 std::string mCurrentPhase = kResetPhase; 263 std::mutex mEngineLock; 264 /** 265 * Used to track the first error occurrence for a given phase. 266 */ 267 std::unique_ptr<ComponentError> mCurrentPhaseError = nullptr; 268 269 /** 270 * Queue for client commands 271 */ 272 std::queue<EngineCommand> mCommandQueue; 273 /** 274 * Queue for error notifications 275 */ 276 std::queue<ComponentError> mErrorQueue; 277 /** 278 * Engine looper 279 */ 280 std::unique_ptr<std::thread> mEngineThread; 281 /** 282 * Condition variable for looper 283 */ 284 std::condition_variable mWakeLooper; 285 /** 286 * ignore input manager allocation 287 */ 288 bool mIgnoreInputManager = false; 289 }; 290 291 /** 292 * Handles callbacks from individual stream managers as specified in the 293 * StreamEngineInterface. 294 */ 295 class StreamCallback : public stream_manager::StreamEngineInterface { 296 public: 297 explicit StreamCallback( 298 const std::function<void()>&& eos, const std::function<void(std::string)>&& errorCb, 299 const std::function<Status(const std::shared_ptr<MemHandle>&)>&& packetHandler); 300 void notifyEndOfStream() override; 301 void notifyError(std::string msg) override; 302 Status dispatchPacket(const std::shared_ptr<MemHandle>& outData) override; 303 ~StreamCallback() = default; 304 305 private: 306 std::function<void(std::string)> mErrorHandler; 307 std::function<void()> mEndOfStreamHandler; 308 std::function<Status(const std::shared_ptr<MemHandle>&)> mPacketHandler; 309 }; 310 311 /** 312 * Handles callbacks from input managers. Forwards frames to the graph. 313 * Only used if graph implementation is local 314 */ 315 class InputCallback : public input_manager::InputEngineInterface { 316 public: 317 explicit InputCallback(int id, const std::function<void(int)>&& cb, 318 const std::function<Status(int, int64_t, const InputFrame&)>&& packetCb); 319 Status dispatchInputFrame(int streamId, int64_t timestamp, const InputFrame& frame) override; 320 void notifyInputError() override; 321 ~InputCallback() = default; 322 323 private: 324 std::function<void(int)> mErrorCallback; 325 std::function<Status(int, int64_t, const InputFrame&)> mPacketHandler; 326 int mInputId; 327 }; 328 329 } // namespace engine 330 } // namespace runner 331 } // namespace computepipe 332 } // namespace automotive 333 } // namespace android 334 335 #endif // COMPUTEPIPE_RUNNER_ENGINE_DEFAULTENGINE_H_ 336