1 /*
2 *
3 * Copyright 2018 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/cpp/server/load_reporter/load_reporter_async_service_impl.h"
22
23 namespace grpc {
24 namespace load_reporter {
25
Run(bool ok)26 void LoadReporterAsyncServiceImpl::CallableTag::Run(bool ok) {
27 GPR_ASSERT(handler_function_ != nullptr);
28 GPR_ASSERT(handler_ != nullptr);
29 handler_function_(std::move(handler_), ok);
30 }
31
LoadReporterAsyncServiceImpl(std::unique_ptr<ServerCompletionQueue> cq)32 LoadReporterAsyncServiceImpl::LoadReporterAsyncServiceImpl(
33 std::unique_ptr<ServerCompletionQueue> cq)
34 : cq_(std::move(cq)) {
35 thread_ = std::unique_ptr<::grpc_core::Thread>(
36 new ::grpc_core::Thread("server_load_reporting", Work, this));
37 std::unique_ptr<CpuStatsProvider> cpu_stats_provider = nullptr;
38 #if defined(GPR_LINUX) || defined(GPR_WINDOWS) || defined(GPR_APPLE)
39 cpu_stats_provider.reset(new CpuStatsProviderDefaultImpl());
40 #endif
41 load_reporter_ = std::unique_ptr<LoadReporter>(new LoadReporter(
42 kFeedbackSampleWindowSeconds,
43 std::unique_ptr<CensusViewProvider>(new CensusViewProviderDefaultImpl()),
44 std::move(cpu_stats_provider)));
45 }
46
~LoadReporterAsyncServiceImpl()47 LoadReporterAsyncServiceImpl::~LoadReporterAsyncServiceImpl() {
48 // We will reach here after the server starts shutting down.
49 shutdown_ = true;
50 {
51 std::unique_lock<std::mutex> lock(cq_shutdown_mu_);
52 cq_->Shutdown();
53 }
54 if (next_fetch_and_sample_alarm_ != nullptr)
55 next_fetch_and_sample_alarm_->Cancel();
56 thread_->Join();
57 }
58
ScheduleNextFetchAndSample()59 void LoadReporterAsyncServiceImpl::ScheduleNextFetchAndSample() {
60 auto next_fetch_and_sample_time =
61 gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
62 gpr_time_from_millis(kFetchAndSampleIntervalSeconds * 1000,
63 GPR_TIMESPAN));
64 {
65 std::unique_lock<std::mutex> lock(cq_shutdown_mu_);
66 if (shutdown_) return;
67 // TODO(juanlishen): Improve the Alarm implementation to reuse a single
68 // instance for multiple events.
69 next_fetch_and_sample_alarm_.reset(new Alarm);
70 next_fetch_and_sample_alarm_->Set(cq_.get(), next_fetch_and_sample_time,
71 this);
72 }
73 gpr_log(GPR_DEBUG, "[LRS %p] Next fetch-and-sample scheduled.", this);
74 }
75
FetchAndSample(bool ok)76 void LoadReporterAsyncServiceImpl::FetchAndSample(bool ok) {
77 if (!ok) {
78 gpr_log(GPR_INFO, "[LRS %p] Fetch-and-sample is stopped.", this);
79 return;
80 }
81 gpr_log(GPR_DEBUG, "[LRS %p] Starting a fetch-and-sample...", this);
82 load_reporter_->FetchAndSample();
83 ScheduleNextFetchAndSample();
84 }
85
Work(void * arg)86 void LoadReporterAsyncServiceImpl::Work(void* arg) {
87 LoadReporterAsyncServiceImpl* service =
88 reinterpret_cast<LoadReporterAsyncServiceImpl*>(arg);
89 service->FetchAndSample(true /* ok */);
90 // TODO(juanlishen): This is a workaround to wait for the cq to be ready. Need
91 // to figure out why cq is not ready after service starts.
92 gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
93 gpr_time_from_seconds(1, GPR_TIMESPAN)));
94 ReportLoadHandler::CreateAndStart(service->cq_.get(), service,
95 service->load_reporter_.get());
96 void* tag;
97 bool ok;
98 while (true) {
99 if (!service->cq_->Next(&tag, &ok)) {
100 // The completion queue is shutting down.
101 GPR_ASSERT(service->shutdown_);
102 break;
103 }
104 if (tag == service) {
105 service->FetchAndSample(ok);
106 } else {
107 auto* next_step = static_cast<CallableTag*>(tag);
108 next_step->Run(ok);
109 }
110 }
111 }
112
StartThread()113 void LoadReporterAsyncServiceImpl::StartThread() { thread_->Start(); }
114
CreateAndStart(ServerCompletionQueue * cq,LoadReporterAsyncServiceImpl * service,LoadReporter * load_reporter)115 void LoadReporterAsyncServiceImpl::ReportLoadHandler::CreateAndStart(
116 ServerCompletionQueue* cq, LoadReporterAsyncServiceImpl* service,
117 LoadReporter* load_reporter) {
118 std::shared_ptr<ReportLoadHandler> handler =
119 std::make_shared<ReportLoadHandler>(cq, service, load_reporter);
120 ReportLoadHandler* p = handler.get();
121 {
122 std::unique_lock<std::mutex> lock(service->cq_shutdown_mu_);
123 if (service->shutdown_) return;
124 p->on_done_notified_ =
125 CallableTag(std::bind(&ReportLoadHandler::OnDoneNotified, p,
126 std::placeholders::_1, std::placeholders::_2),
127 handler);
128 p->next_inbound_ =
129 CallableTag(std::bind(&ReportLoadHandler::OnRequestDelivered, p,
130 std::placeholders::_1, std::placeholders::_2),
131 std::move(handler));
132 p->ctx_.AsyncNotifyWhenDone(&p->on_done_notified_);
133 service->RequestReportLoad(&p->ctx_, &p->stream_, cq, cq,
134 &p->next_inbound_);
135 }
136 }
137
ReportLoadHandler(ServerCompletionQueue * cq,LoadReporterAsyncServiceImpl * service,LoadReporter * load_reporter)138 LoadReporterAsyncServiceImpl::ReportLoadHandler::ReportLoadHandler(
139 ServerCompletionQueue* cq, LoadReporterAsyncServiceImpl* service,
140 LoadReporter* load_reporter)
141 : cq_(cq),
142 service_(service),
143 load_reporter_(load_reporter),
144 stream_(&ctx_),
145 call_status_(WAITING_FOR_DELIVERY) {}
146
OnRequestDelivered(std::shared_ptr<ReportLoadHandler> self,bool ok)147 void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnRequestDelivered(
148 std::shared_ptr<ReportLoadHandler> self, bool ok) {
149 if (ok) {
150 call_status_ = DELIVERED;
151 } else {
152 // AsyncNotifyWhenDone() needs to be called before the call starts, but the
153 // tag will not pop out if the call never starts (
154 // https://github.com/grpc/grpc/issues/10136). So we need to manually
155 // release the ownership of the handler in this case.
156 GPR_ASSERT(on_done_notified_.ReleaseHandler() != nullptr);
157 }
158 if (!ok || shutdown_) {
159 // The value of ok being false means that the server is shutting down.
160 Shutdown(std::move(self), "OnRequestDelivered");
161 return;
162 }
163 // Spawn a new handler instance to serve the next new client. Every handler
164 // instance will deallocate itself when it's done.
165 CreateAndStart(cq_, service_, load_reporter_);
166 {
167 std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_);
168 if (service_->shutdown_) {
169 lock.release()->unlock();
170 Shutdown(std::move(self), "OnRequestDelivered");
171 return;
172 }
173 next_inbound_ =
174 CallableTag(std::bind(&ReportLoadHandler::OnReadDone, this,
175 std::placeholders::_1, std::placeholders::_2),
176 std::move(self));
177 stream_.Read(&request_, &next_inbound_);
178 }
179 // LB ID is unique for each load reporting stream.
180 lb_id_ = load_reporter_->GenerateLbId();
181 gpr_log(GPR_INFO,
182 "[LRS %p] Call request delivered (lb_id_: %s, handler: %p). "
183 "Start reading the initial request...",
184 service_, lb_id_.c_str(), this);
185 }
186
OnReadDone(std::shared_ptr<ReportLoadHandler> self,bool ok)187 void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnReadDone(
188 std::shared_ptr<ReportLoadHandler> self, bool ok) {
189 if (!ok || shutdown_) {
190 if (!ok && call_status_ < INITIAL_REQUEST_RECEIVED) {
191 // The client may have half-closed the stream or the stream is broken.
192 gpr_log(GPR_INFO,
193 "[LRS %p] Failed reading the initial request from the stream "
194 "(lb_id_: %s, handler: %p, done_notified: %d, is_cancelled: %d).",
195 service_, lb_id_.c_str(), this, static_cast<int>(done_notified_),
196 static_cast<int>(is_cancelled_));
197 }
198 Shutdown(std::move(self), "OnReadDone");
199 return;
200 }
201 // We only receive one request, which is the initial request.
202 if (call_status_ < INITIAL_REQUEST_RECEIVED) {
203 if (!request_.has_initial_request()) {
204 Shutdown(std::move(self), "OnReadDone+initial_request_not_found");
205 } else {
206 call_status_ = INITIAL_REQUEST_RECEIVED;
207 const auto& initial_request = request_.initial_request();
208 load_balanced_hostname_ = initial_request.load_balanced_hostname();
209 load_key_ = initial_request.load_key();
210 load_reporter_->ReportStreamCreated(load_balanced_hostname_, lb_id_,
211 load_key_);
212 const auto& load_report_interval = initial_request.load_report_interval();
213 load_report_interval_ms_ =
214 static_cast<uint64_t>(load_report_interval.seconds() * 1000 +
215 load_report_interval.nanos() / 1000);
216 gpr_log(
217 GPR_INFO,
218 "[LRS %p] Initial request received. Start load reporting (load "
219 "balanced host: %s, interval: %lu ms, lb_id_: %s, handler: %p)...",
220 service_, load_balanced_hostname_.c_str(), load_report_interval_ms_,
221 lb_id_.c_str(), this);
222 SendReport(self, true /* ok */);
223 // Expect this read to fail.
224 {
225 std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_);
226 if (service_->shutdown_) {
227 lock.release()->unlock();
228 Shutdown(std::move(self), "OnReadDone");
229 return;
230 }
231 next_inbound_ =
232 CallableTag(std::bind(&ReportLoadHandler::OnReadDone, this,
233 std::placeholders::_1, std::placeholders::_2),
234 std::move(self));
235 stream_.Read(&request_, &next_inbound_);
236 }
237 }
238 } else {
239 // Another request received! This violates the spec.
240 gpr_log(GPR_ERROR,
241 "[LRS %p] Another request received (lb_id_: %s, handler: %p).",
242 service_, lb_id_.c_str(), this);
243 Shutdown(std::move(self), "OnReadDone+second_request");
244 }
245 }
246
ScheduleNextReport(std::shared_ptr<ReportLoadHandler> self,bool ok)247 void LoadReporterAsyncServiceImpl::ReportLoadHandler::ScheduleNextReport(
248 std::shared_ptr<ReportLoadHandler> self, bool ok) {
249 if (!ok || shutdown_) {
250 Shutdown(std::move(self), "ScheduleNextReport");
251 return;
252 }
253 auto next_report_time = gpr_time_add(
254 gpr_now(GPR_CLOCK_MONOTONIC),
255 gpr_time_from_millis(load_report_interval_ms_, GPR_TIMESPAN));
256 {
257 std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_);
258 if (service_->shutdown_) {
259 lock.release()->unlock();
260 Shutdown(std::move(self), "ScheduleNextReport");
261 return;
262 }
263 next_outbound_ =
264 CallableTag(std::bind(&ReportLoadHandler::SendReport, this,
265 std::placeholders::_1, std::placeholders::_2),
266 std::move(self));
267 // TODO(juanlishen): Improve the Alarm implementation to reuse a single
268 // instance for multiple events.
269 next_report_alarm_.reset(new Alarm);
270 next_report_alarm_->Set(cq_, next_report_time, &next_outbound_);
271 }
272 gpr_log(GPR_DEBUG,
273 "[LRS %p] Next load report scheduled (lb_id_: %s, handler: %p).",
274 service_, lb_id_.c_str(), this);
275 }
276
SendReport(std::shared_ptr<ReportLoadHandler> self,bool ok)277 void LoadReporterAsyncServiceImpl::ReportLoadHandler::SendReport(
278 std::shared_ptr<ReportLoadHandler> self, bool ok) {
279 if (!ok || shutdown_) {
280 Shutdown(std::move(self), "SendReport");
281 return;
282 }
283 ::grpc::lb::v1::LoadReportResponse response;
284 auto loads = load_reporter_->GenerateLoads(load_balanced_hostname_, lb_id_);
285 response.mutable_load()->Swap(&loads);
286 auto feedback = load_reporter_->GenerateLoadBalancingFeedback();
287 response.mutable_load_balancing_feedback()->Swap(&feedback);
288 if (call_status_ < INITIAL_RESPONSE_SENT) {
289 auto initial_response = response.mutable_initial_response();
290 initial_response->set_load_balancer_id(lb_id_);
291 initial_response->set_implementation_id(
292 ::grpc::lb::v1::InitialLoadReportResponse::CPP);
293 initial_response->set_server_version(kVersion);
294 call_status_ = INITIAL_RESPONSE_SENT;
295 }
296 {
297 std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_);
298 if (service_->shutdown_) {
299 lock.release()->unlock();
300 Shutdown(std::move(self), "SendReport");
301 return;
302 }
303 next_outbound_ =
304 CallableTag(std::bind(&ReportLoadHandler::ScheduleNextReport, this,
305 std::placeholders::_1, std::placeholders::_2),
306 std::move(self));
307 stream_.Write(response, &next_outbound_);
308 gpr_log(GPR_INFO,
309 "[LRS %p] Sending load report (lb_id_: %s, handler: %p, loads "
310 "count: %d)...",
311 service_, lb_id_.c_str(), this, response.load().size());
312 }
313 }
314
OnDoneNotified(std::shared_ptr<ReportLoadHandler> self,bool ok)315 void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnDoneNotified(
316 std::shared_ptr<ReportLoadHandler> self, bool ok) {
317 GPR_ASSERT(ok);
318 done_notified_ = true;
319 if (ctx_.IsCancelled()) {
320 is_cancelled_ = true;
321 }
322 gpr_log(GPR_INFO,
323 "[LRS %p] Load reporting call is notified done (handler: %p, "
324 "is_cancelled: %d).",
325 service_, this, static_cast<int>(is_cancelled_));
326 Shutdown(std::move(self), "OnDoneNotified");
327 }
328
Shutdown(std::shared_ptr<ReportLoadHandler> self,const char * reason)329 void LoadReporterAsyncServiceImpl::ReportLoadHandler::Shutdown(
330 std::shared_ptr<ReportLoadHandler> self, const char* reason) {
331 if (!shutdown_) {
332 gpr_log(GPR_INFO,
333 "[LRS %p] Shutting down the handler (lb_id_: %s, handler: %p, "
334 "reason: %s).",
335 service_, lb_id_.c_str(), this, reason);
336 shutdown_ = true;
337 if (call_status_ >= INITIAL_REQUEST_RECEIVED) {
338 load_reporter_->ReportStreamClosed(load_balanced_hostname_, lb_id_);
339 next_report_alarm_->Cancel();
340 }
341 }
342 // OnRequestDelivered() may be called after OnDoneNotified(), so we need to
343 // try to Finish() every time we are in Shutdown().
344 if (call_status_ >= DELIVERED && call_status_ < FINISH_CALLED) {
345 std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_);
346 if (!service_->shutdown_) {
347 on_finish_done_ =
348 CallableTag(std::bind(&ReportLoadHandler::OnFinishDone, this,
349 std::placeholders::_1, std::placeholders::_2),
350 std::move(self));
351 // TODO(juanlishen): Maybe add a message proto for the client to
352 // explicitly cancel the stream so that we can return OK status in such
353 // cases.
354 stream_.Finish(Status::CANCELLED, &on_finish_done_);
355 call_status_ = FINISH_CALLED;
356 }
357 }
358 }
359
OnFinishDone(std::shared_ptr<ReportLoadHandler> self,bool ok)360 void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnFinishDone(
361 std::shared_ptr<ReportLoadHandler> self, bool ok) {
362 if (ok) {
363 gpr_log(GPR_INFO,
364 "[LRS %p] Load reporting finished (lb_id_: %s, handler: %p).",
365 service_, lb_id_.c_str(), this);
366 }
367 }
368
369 } // namespace load_reporter
370 } // namespace grpc
371