1 // Copyright (C) 2020 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //      http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "StreamSetObserver.h"
16 
17 #include <android-base/logging.h>
18 #include <grpcpp/grpcpp.h>
19 
20 #include "ClientConfig.pb.h"
21 #include "GrpcGraph.h"
22 #include "InputFrame.h"
23 #include "RunnerComponent.h"
24 #include "prebuilt_interface.h"
25 #include "types/Status.h"
26 
27 namespace android {
28 namespace automotive {
29 namespace computepipe {
30 namespace graph {
31 
SingleStreamObserver(int streamId,EndOfStreamReporter * endOfStreamReporter,StreamGraphInterface * streamGraphInterface)32 SingleStreamObserver::SingleStreamObserver(int streamId, EndOfStreamReporter* endOfStreamReporter,
33                                            StreamGraphInterface* streamGraphInterface) :
34       mStreamId(streamId),
35       mEndOfStreamReporter(endOfStreamReporter),
36       mStreamGraphInterface(streamGraphInterface) {}
37 
startObservingStream()38 Status SingleStreamObserver::startObservingStream() {
39     {
40         std::lock_guard lock(mStopObservationLock);
41         mStopped = false;
42     }
43     mThread = std::thread([this]() {
44         proto::ObserveOutputStreamRequest observeStreamRequest;
45         observeStreamRequest.set_stream_id(mStreamId);
46         ::grpc::ClientContext context;
47         ::grpc::CompletionQueue cq;
48 
49         void* tag;
50         bool cqStatus;
51 
52         std::unique_ptr<::grpc::ClientAsyncReader<proto::OutputStreamResponse>> rpc(
53                 mStreamGraphInterface->getServiceStub()
54                         ->AsyncObserveOutputStream(&context, observeStreamRequest, &cq, nullptr));
55 
56         proto::OutputStreamResponse response;
57 
58         cq.Next(&tag, &cqStatus);
59         while (cqStatus) {
60             // Dispatch data only stream is being observed.
61             rpc->Read(&response, nullptr);
62             {
63                 std::lock_guard lock(mStopObservationLock);
64                 if (mStopped || mStreamGraphInterface == nullptr) {
65                     LOG(INFO) << "Graph stopped. ";
66                     break;
67                 }
68 
69                 // Since this is a separate thread, we need not worry about recursive locking
70                 // and callback can be executed without creating a new thread.
71                 if (response.has_pixel_data()) {
72                     proto::PixelData pixels = response.pixel_data();
73                     runner::InputFrame frame(pixels.height(), pixels.width(),
74                                              static_cast<PixelFormat>(
75                                                      static_cast<int>(pixels.format())),
76                                              pixels.step(),
77                                              reinterpret_cast<const unsigned char*>(
78                                                      pixels.data().c_str()));
79                     mStreamGraphInterface->dispatchPixelData(mStreamId, response.timestamp_us(),
80                                                              frame);
81                 } else if (response.has_semantic_data()) {
82                     mStreamGraphInterface
83                             ->dispatchSerializedData(mStreamId, response.timestamp_us(),
84                                                      std::move(*response.mutable_semantic_data()));
85                 }
86             }
87 
88             cq.Next(&tag, &cqStatus);
89         }
90 
91         ::grpc::Status grpcStatus;
92         rpc->Finish(&grpcStatus, nullptr);
93         if (!grpcStatus.ok()) {
94             LOG(ERROR) << "Failed RPC with message: " << grpcStatus.error_message();
95         }
96 
97         cq.Shutdown();
98         if (mEndOfStreamReporter) {
99             std::lock_guard lock(mStopObservationLock);
100             mStopped = true;
101             std::thread t =
102                     std::thread([this]() { mEndOfStreamReporter->reportStreamClosed(mStreamId); });
103 
104             t.detach();
105         }
106 
107         proto::OutputStreamResponse streamResponse;
108     });
109 
110     return Status::SUCCESS;
111 }
112 
stopObservingStream()113 void SingleStreamObserver::stopObservingStream() {
114     std::lock_guard lock(mStopObservationLock);
115     mStopped = true;
116 }
117 
~SingleStreamObserver()118 SingleStreamObserver::~SingleStreamObserver() {
119     stopObservingStream();
120 
121     if (mThread.joinable()) {
122         mThread.join();
123     }
124 }
125 
StreamSetObserver(const runner::ClientConfig & clientConfig,StreamGraphInterface * streamGraphInterface)126 StreamSetObserver::StreamSetObserver(const runner::ClientConfig& clientConfig,
127                                      StreamGraphInterface* streamGraphInterface) :
128       mClientConfig(clientConfig), mStreamGraphInterface(streamGraphInterface) {}
129 
startObservingStreams()130 Status StreamSetObserver::startObservingStreams() {
131     std::lock_guard lock(mLock);
132     std::map<int, int> outputConfigs = {};
133     mClientConfig.getOutputStreamConfigs(outputConfigs);
134 
135     if (!mStopped || !mStreamObservers.empty()) {
136         LOG(ERROR) << "Already started observing streams. Duplicate call is not allowed";
137         return Status::ILLEGAL_STATE;
138     }
139 
140     for (const auto& it : outputConfigs) {
141         auto streamObserver =
142                 std::make_unique<SingleStreamObserver>(it.first, this, mStreamGraphInterface);
143         Status status = streamObserver->startObservingStream();
144         if (status != Status::SUCCESS) {
145             std::thread t([this]() { stopObservingStreams(true); });
146             t.detach();
147             return status;
148         }
149         mStreamObservers.emplace(std::make_pair(it.first, std::move(streamObserver)));
150     }
151 
152     mStopped = mStreamObservers.empty();
153     return Status::SUCCESS;
154 }
155 
stopObservingStreams(bool stopImmediately)156 void StreamSetObserver::stopObservingStreams(bool stopImmediately) {
157     std::unique_lock lock(mLock);
158     if (mStopped) {
159         // Separate thread is necessary here to avoid recursive locking.
160         std::thread t([streamGraphInterface(mStreamGraphInterface)]() {
161             streamGraphInterface->dispatchGraphTerminationMessage(Status::SUCCESS, "");
162         });
163         t.detach();
164         return;
165     }
166 
167     // Wait for the streams to close if we are not stopping immediately.
168     if (stopImmediately) {
169         for (auto& it : mStreamObservers) {
170             it.second->stopObservingStream();
171         }
172 
173         mStoppedCv.wait(lock, [this]() -> bool { return mStopped; });
174     }
175 }
176 
reportStreamClosed(int streamId)177 void StreamSetObserver::reportStreamClosed(int streamId) {
178     std::lock_guard lock(mLock);
179     auto streamObserver = mStreamObservers.find(streamId);
180     if (streamObserver == mStreamObservers.end()) {
181         return;
182     }
183     mStreamObservers.erase(streamObserver);
184     if (mStreamObservers.empty()) {
185         mStopped = true;
186         mStoppedCv.notify_one();
187         std::thread t([streamGraphInterface(mStreamGraphInterface)]() {
188             streamGraphInterface->dispatchGraphTerminationMessage(Status::SUCCESS, "");
189         });
190         t.detach();
191     }
192 }
193 
194 }  // namespace graph
195 }  // namespace computepipe
196 }  // namespace automotive
197 }  // namespace android
198