1 // Copyright (C) 2020 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "StreamSetObserver.h"
16
17 #include <android-base/logging.h>
18 #include <grpcpp/grpcpp.h>
19
20 #include "ClientConfig.pb.h"
21 #include "GrpcGraph.h"
22 #include "InputFrame.h"
23 #include "RunnerComponent.h"
24 #include "prebuilt_interface.h"
25 #include "types/Status.h"
26
27 namespace android {
28 namespace automotive {
29 namespace computepipe {
30 namespace graph {
31
SingleStreamObserver(int streamId,EndOfStreamReporter * endOfStreamReporter,StreamGraphInterface * streamGraphInterface)32 SingleStreamObserver::SingleStreamObserver(int streamId, EndOfStreamReporter* endOfStreamReporter,
33 StreamGraphInterface* streamGraphInterface) :
34 mStreamId(streamId),
35 mEndOfStreamReporter(endOfStreamReporter),
36 mStreamGraphInterface(streamGraphInterface) {}
37
startObservingStream()38 Status SingleStreamObserver::startObservingStream() {
39 {
40 std::lock_guard lock(mStopObservationLock);
41 mStopped = false;
42 }
43 mThread = std::thread([this]() {
44 proto::ObserveOutputStreamRequest observeStreamRequest;
45 observeStreamRequest.set_stream_id(mStreamId);
46 ::grpc::ClientContext context;
47 ::grpc::CompletionQueue cq;
48
49 void* tag;
50 bool cqStatus;
51
52 std::unique_ptr<::grpc::ClientAsyncReader<proto::OutputStreamResponse>> rpc(
53 mStreamGraphInterface->getServiceStub()
54 ->AsyncObserveOutputStream(&context, observeStreamRequest, &cq, nullptr));
55
56 proto::OutputStreamResponse response;
57
58 cq.Next(&tag, &cqStatus);
59 while (cqStatus) {
60 // Dispatch data only stream is being observed.
61 rpc->Read(&response, nullptr);
62 {
63 std::lock_guard lock(mStopObservationLock);
64 if (mStopped || mStreamGraphInterface == nullptr) {
65 LOG(INFO) << "Graph stopped. ";
66 break;
67 }
68
69 // Since this is a separate thread, we need not worry about recursive locking
70 // and callback can be executed without creating a new thread.
71 if (response.has_pixel_data()) {
72 proto::PixelData pixels = response.pixel_data();
73 runner::InputFrame frame(pixels.height(), pixels.width(),
74 static_cast<PixelFormat>(
75 static_cast<int>(pixels.format())),
76 pixels.step(),
77 reinterpret_cast<const unsigned char*>(
78 pixels.data().c_str()));
79 mStreamGraphInterface->dispatchPixelData(mStreamId, response.timestamp_us(),
80 frame);
81 } else if (response.has_semantic_data()) {
82 mStreamGraphInterface
83 ->dispatchSerializedData(mStreamId, response.timestamp_us(),
84 std::move(*response.mutable_semantic_data()));
85 }
86 }
87
88 cq.Next(&tag, &cqStatus);
89 }
90
91 ::grpc::Status grpcStatus;
92 rpc->Finish(&grpcStatus, nullptr);
93 if (!grpcStatus.ok()) {
94 LOG(ERROR) << "Failed RPC with message: " << grpcStatus.error_message();
95 }
96
97 cq.Shutdown();
98 if (mEndOfStreamReporter) {
99 std::lock_guard lock(mStopObservationLock);
100 mStopped = true;
101 std::thread t =
102 std::thread([this]() { mEndOfStreamReporter->reportStreamClosed(mStreamId); });
103
104 t.detach();
105 }
106
107 proto::OutputStreamResponse streamResponse;
108 });
109
110 return Status::SUCCESS;
111 }
112
stopObservingStream()113 void SingleStreamObserver::stopObservingStream() {
114 std::lock_guard lock(mStopObservationLock);
115 mStopped = true;
116 }
117
~SingleStreamObserver()118 SingleStreamObserver::~SingleStreamObserver() {
119 stopObservingStream();
120
121 if (mThread.joinable()) {
122 mThread.join();
123 }
124 }
125
StreamSetObserver(const runner::ClientConfig & clientConfig,StreamGraphInterface * streamGraphInterface)126 StreamSetObserver::StreamSetObserver(const runner::ClientConfig& clientConfig,
127 StreamGraphInterface* streamGraphInterface) :
128 mClientConfig(clientConfig), mStreamGraphInterface(streamGraphInterface) {}
129
startObservingStreams()130 Status StreamSetObserver::startObservingStreams() {
131 std::lock_guard lock(mLock);
132 std::map<int, int> outputConfigs = {};
133 mClientConfig.getOutputStreamConfigs(outputConfigs);
134
135 if (!mStopped || !mStreamObservers.empty()) {
136 LOG(ERROR) << "Already started observing streams. Duplicate call is not allowed";
137 return Status::ILLEGAL_STATE;
138 }
139
140 for (const auto& it : outputConfigs) {
141 auto streamObserver =
142 std::make_unique<SingleStreamObserver>(it.first, this, mStreamGraphInterface);
143 Status status = streamObserver->startObservingStream();
144 if (status != Status::SUCCESS) {
145 std::thread t([this]() { stopObservingStreams(true); });
146 t.detach();
147 return status;
148 }
149 mStreamObservers.emplace(std::make_pair(it.first, std::move(streamObserver)));
150 }
151
152 mStopped = mStreamObservers.empty();
153 return Status::SUCCESS;
154 }
155
stopObservingStreams(bool stopImmediately)156 void StreamSetObserver::stopObservingStreams(bool stopImmediately) {
157 std::unique_lock lock(mLock);
158 if (mStopped) {
159 // Separate thread is necessary here to avoid recursive locking.
160 std::thread t([streamGraphInterface(mStreamGraphInterface)]() {
161 streamGraphInterface->dispatchGraphTerminationMessage(Status::SUCCESS, "");
162 });
163 t.detach();
164 return;
165 }
166
167 // Wait for the streams to close if we are not stopping immediately.
168 if (stopImmediately) {
169 for (auto& it : mStreamObservers) {
170 it.second->stopObservingStream();
171 }
172
173 mStoppedCv.wait(lock, [this]() -> bool { return mStopped; });
174 }
175 }
176
reportStreamClosed(int streamId)177 void StreamSetObserver::reportStreamClosed(int streamId) {
178 std::lock_guard lock(mLock);
179 auto streamObserver = mStreamObservers.find(streamId);
180 if (streamObserver == mStreamObservers.end()) {
181 return;
182 }
183 mStreamObservers.erase(streamObserver);
184 if (mStreamObservers.empty()) {
185 mStopped = true;
186 mStoppedCv.notify_one();
187 std::thread t([streamGraphInterface(mStreamGraphInterface)]() {
188 streamGraphInterface->dispatchGraphTerminationMessage(Status::SUCCESS, "");
189 });
190 t.detach();
191 }
192 }
193
194 } // namespace graph
195 } // namespace computepipe
196 } // namespace automotive
197 } // namespace android
198