1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/cpp/server/load_reporter/load_reporter_async_service_impl.h"
22 
23 namespace grpc {
24 namespace load_reporter {
25 
Run(bool ok)26 void LoadReporterAsyncServiceImpl::CallableTag::Run(bool ok) {
27   GPR_ASSERT(handler_function_ != nullptr);
28   GPR_ASSERT(handler_ != nullptr);
29   handler_function_(std::move(handler_), ok);
30 }
31 
LoadReporterAsyncServiceImpl(std::unique_ptr<ServerCompletionQueue> cq)32 LoadReporterAsyncServiceImpl::LoadReporterAsyncServiceImpl(
33     std::unique_ptr<ServerCompletionQueue> cq)
34     : cq_(std::move(cq)) {
35   thread_ = std::unique_ptr<::grpc_core::Thread>(
36       new ::grpc_core::Thread("server_load_reporting", Work, this));
37   std::unique_ptr<CpuStatsProvider> cpu_stats_provider = nullptr;
38 #if defined(GPR_LINUX) || defined(GPR_WINDOWS) || defined(GPR_APPLE)
39   cpu_stats_provider.reset(new CpuStatsProviderDefaultImpl());
40 #endif
41   load_reporter_ = std::unique_ptr<LoadReporter>(new LoadReporter(
42       kFeedbackSampleWindowSeconds,
43       std::unique_ptr<CensusViewProvider>(new CensusViewProviderDefaultImpl()),
44       std::move(cpu_stats_provider)));
45 }
46 
~LoadReporterAsyncServiceImpl()47 LoadReporterAsyncServiceImpl::~LoadReporterAsyncServiceImpl() {
48   // We will reach here after the server starts shutting down.
49   shutdown_ = true;
50   {
51     std::unique_lock<std::mutex> lock(cq_shutdown_mu_);
52     cq_->Shutdown();
53   }
54   if (next_fetch_and_sample_alarm_ != nullptr)
55     next_fetch_and_sample_alarm_->Cancel();
56   thread_->Join();
57 }
58 
ScheduleNextFetchAndSample()59 void LoadReporterAsyncServiceImpl::ScheduleNextFetchAndSample() {
60   auto next_fetch_and_sample_time =
61       gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
62                    gpr_time_from_millis(kFetchAndSampleIntervalSeconds * 1000,
63                                         GPR_TIMESPAN));
64   {
65     std::unique_lock<std::mutex> lock(cq_shutdown_mu_);
66     if (shutdown_) return;
67     // TODO(juanlishen): Improve the Alarm implementation to reuse a single
68     // instance for multiple events.
69     next_fetch_and_sample_alarm_.reset(new Alarm);
70     next_fetch_and_sample_alarm_->Set(cq_.get(), next_fetch_and_sample_time,
71                                       this);
72   }
73   gpr_log(GPR_DEBUG, "[LRS %p] Next fetch-and-sample scheduled.", this);
74 }
75 
FetchAndSample(bool ok)76 void LoadReporterAsyncServiceImpl::FetchAndSample(bool ok) {
77   if (!ok) {
78     gpr_log(GPR_INFO, "[LRS %p] Fetch-and-sample is stopped.", this);
79     return;
80   }
81   gpr_log(GPR_DEBUG, "[LRS %p] Starting a fetch-and-sample...", this);
82   load_reporter_->FetchAndSample();
83   ScheduleNextFetchAndSample();
84 }
85 
Work(void * arg)86 void LoadReporterAsyncServiceImpl::Work(void* arg) {
87   LoadReporterAsyncServiceImpl* service =
88       reinterpret_cast<LoadReporterAsyncServiceImpl*>(arg);
89   service->FetchAndSample(true /* ok */);
90   // TODO(juanlishen): This is a workaround to wait for the cq to be ready. Need
91   // to figure out why cq is not ready after service starts.
92   gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
93                                gpr_time_from_seconds(1, GPR_TIMESPAN)));
94   ReportLoadHandler::CreateAndStart(service->cq_.get(), service,
95                                     service->load_reporter_.get());
96   void* tag;
97   bool ok;
98   while (true) {
99     if (!service->cq_->Next(&tag, &ok)) {
100       // The completion queue is shutting down.
101       GPR_ASSERT(service->shutdown_);
102       break;
103     }
104     if (tag == service) {
105       service->FetchAndSample(ok);
106     } else {
107       auto* next_step = static_cast<CallableTag*>(tag);
108       next_step->Run(ok);
109     }
110   }
111 }
112 
StartThread()113 void LoadReporterAsyncServiceImpl::StartThread() { thread_->Start(); }
114 
CreateAndStart(ServerCompletionQueue * cq,LoadReporterAsyncServiceImpl * service,LoadReporter * load_reporter)115 void LoadReporterAsyncServiceImpl::ReportLoadHandler::CreateAndStart(
116     ServerCompletionQueue* cq, LoadReporterAsyncServiceImpl* service,
117     LoadReporter* load_reporter) {
118   std::shared_ptr<ReportLoadHandler> handler =
119       std::make_shared<ReportLoadHandler>(cq, service, load_reporter);
120   ReportLoadHandler* p = handler.get();
121   {
122     std::unique_lock<std::mutex> lock(service->cq_shutdown_mu_);
123     if (service->shutdown_) return;
124     p->on_done_notified_ =
125         CallableTag(std::bind(&ReportLoadHandler::OnDoneNotified, p,
126                               std::placeholders::_1, std::placeholders::_2),
127                     handler);
128     p->next_inbound_ =
129         CallableTag(std::bind(&ReportLoadHandler::OnRequestDelivered, p,
130                               std::placeholders::_1, std::placeholders::_2),
131                     std::move(handler));
132     p->ctx_.AsyncNotifyWhenDone(&p->on_done_notified_);
133     service->RequestReportLoad(&p->ctx_, &p->stream_, cq, cq,
134                                &p->next_inbound_);
135   }
136 }
137 
ReportLoadHandler(ServerCompletionQueue * cq,LoadReporterAsyncServiceImpl * service,LoadReporter * load_reporter)138 LoadReporterAsyncServiceImpl::ReportLoadHandler::ReportLoadHandler(
139     ServerCompletionQueue* cq, LoadReporterAsyncServiceImpl* service,
140     LoadReporter* load_reporter)
141     : cq_(cq),
142       service_(service),
143       load_reporter_(load_reporter),
144       stream_(&ctx_),
145       call_status_(WAITING_FOR_DELIVERY) {}
146 
OnRequestDelivered(std::shared_ptr<ReportLoadHandler> self,bool ok)147 void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnRequestDelivered(
148     std::shared_ptr<ReportLoadHandler> self, bool ok) {
149   if (ok) {
150     call_status_ = DELIVERED;
151   } else {
152     // AsyncNotifyWhenDone() needs to be called before the call starts, but the
153     // tag will not pop out if the call never starts (
154     // https://github.com/grpc/grpc/issues/10136). So we need to manually
155     // release the ownership of the handler in this case.
156     GPR_ASSERT(on_done_notified_.ReleaseHandler() != nullptr);
157   }
158   if (!ok || shutdown_) {
159     // The value of ok being false means that the server is shutting down.
160     Shutdown(std::move(self), "OnRequestDelivered");
161     return;
162   }
163   // Spawn a new handler instance to serve the next new client. Every handler
164   // instance will deallocate itself when it's done.
165   CreateAndStart(cq_, service_, load_reporter_);
166   {
167     std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_);
168     if (service_->shutdown_) {
169       lock.release()->unlock();
170       Shutdown(std::move(self), "OnRequestDelivered");
171       return;
172     }
173     next_inbound_ =
174         CallableTag(std::bind(&ReportLoadHandler::OnReadDone, this,
175                               std::placeholders::_1, std::placeholders::_2),
176                     std::move(self));
177     stream_.Read(&request_, &next_inbound_);
178   }
179   // LB ID is unique for each load reporting stream.
180   lb_id_ = load_reporter_->GenerateLbId();
181   gpr_log(GPR_INFO,
182           "[LRS %p] Call request delivered (lb_id_: %s, handler: %p). "
183           "Start reading the initial request...",
184           service_, lb_id_.c_str(), this);
185 }
186 
OnReadDone(std::shared_ptr<ReportLoadHandler> self,bool ok)187 void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnReadDone(
188     std::shared_ptr<ReportLoadHandler> self, bool ok) {
189   if (!ok || shutdown_) {
190     if (!ok && call_status_ < INITIAL_REQUEST_RECEIVED) {
191       // The client may have half-closed the stream or the stream is broken.
192       gpr_log(GPR_INFO,
193               "[LRS %p] Failed reading the initial request from the stream "
194               "(lb_id_: %s, handler: %p, done_notified: %d, is_cancelled: %d).",
195               service_, lb_id_.c_str(), this, static_cast<int>(done_notified_),
196               static_cast<int>(is_cancelled_));
197     }
198     Shutdown(std::move(self), "OnReadDone");
199     return;
200   }
201   // We only receive one request, which is the initial request.
202   if (call_status_ < INITIAL_REQUEST_RECEIVED) {
203     if (!request_.has_initial_request()) {
204       Shutdown(std::move(self), "OnReadDone+initial_request_not_found");
205     } else {
206       call_status_ = INITIAL_REQUEST_RECEIVED;
207       const auto& initial_request = request_.initial_request();
208       load_balanced_hostname_ = initial_request.load_balanced_hostname();
209       load_key_ = initial_request.load_key();
210       load_reporter_->ReportStreamCreated(load_balanced_hostname_, lb_id_,
211                                           load_key_);
212       const auto& load_report_interval = initial_request.load_report_interval();
213       load_report_interval_ms_ =
214           static_cast<uint64_t>(load_report_interval.seconds() * 1000 +
215                                 load_report_interval.nanos() / 1000);
216       gpr_log(
217           GPR_INFO,
218           "[LRS %p] Initial request received. Start load reporting (load "
219           "balanced host: %s, interval: %lu ms, lb_id_: %s, handler: %p)...",
220           service_, load_balanced_hostname_.c_str(), load_report_interval_ms_,
221           lb_id_.c_str(), this);
222       SendReport(self, true /* ok */);
223       // Expect this read to fail.
224       {
225         std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_);
226         if (service_->shutdown_) {
227           lock.release()->unlock();
228           Shutdown(std::move(self), "OnReadDone");
229           return;
230         }
231         next_inbound_ =
232             CallableTag(std::bind(&ReportLoadHandler::OnReadDone, this,
233                                   std::placeholders::_1, std::placeholders::_2),
234                         std::move(self));
235         stream_.Read(&request_, &next_inbound_);
236       }
237     }
238   } else {
239     // Another request received! This violates the spec.
240     gpr_log(GPR_ERROR,
241             "[LRS %p] Another request received (lb_id_: %s, handler: %p).",
242             service_, lb_id_.c_str(), this);
243     Shutdown(std::move(self), "OnReadDone+second_request");
244   }
245 }
246 
ScheduleNextReport(std::shared_ptr<ReportLoadHandler> self,bool ok)247 void LoadReporterAsyncServiceImpl::ReportLoadHandler::ScheduleNextReport(
248     std::shared_ptr<ReportLoadHandler> self, bool ok) {
249   if (!ok || shutdown_) {
250     Shutdown(std::move(self), "ScheduleNextReport");
251     return;
252   }
253   auto next_report_time = gpr_time_add(
254       gpr_now(GPR_CLOCK_MONOTONIC),
255       gpr_time_from_millis(load_report_interval_ms_, GPR_TIMESPAN));
256   {
257     std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_);
258     if (service_->shutdown_) {
259       lock.release()->unlock();
260       Shutdown(std::move(self), "ScheduleNextReport");
261       return;
262     }
263     next_outbound_ =
264         CallableTag(std::bind(&ReportLoadHandler::SendReport, this,
265                               std::placeholders::_1, std::placeholders::_2),
266                     std::move(self));
267     // TODO(juanlishen): Improve the Alarm implementation to reuse a single
268     // instance for multiple events.
269     next_report_alarm_.reset(new Alarm);
270     next_report_alarm_->Set(cq_, next_report_time, &next_outbound_);
271   }
272   gpr_log(GPR_DEBUG,
273           "[LRS %p] Next load report scheduled (lb_id_: %s, handler: %p).",
274           service_, lb_id_.c_str(), this);
275 }
276 
SendReport(std::shared_ptr<ReportLoadHandler> self,bool ok)277 void LoadReporterAsyncServiceImpl::ReportLoadHandler::SendReport(
278     std::shared_ptr<ReportLoadHandler> self, bool ok) {
279   if (!ok || shutdown_) {
280     Shutdown(std::move(self), "SendReport");
281     return;
282   }
283   ::grpc::lb::v1::LoadReportResponse response;
284   auto loads = load_reporter_->GenerateLoads(load_balanced_hostname_, lb_id_);
285   response.mutable_load()->Swap(&loads);
286   auto feedback = load_reporter_->GenerateLoadBalancingFeedback();
287   response.mutable_load_balancing_feedback()->Swap(&feedback);
288   if (call_status_ < INITIAL_RESPONSE_SENT) {
289     auto initial_response = response.mutable_initial_response();
290     initial_response->set_load_balancer_id(lb_id_);
291     initial_response->set_implementation_id(
292         ::grpc::lb::v1::InitialLoadReportResponse::CPP);
293     initial_response->set_server_version(kVersion);
294     call_status_ = INITIAL_RESPONSE_SENT;
295   }
296   {
297     std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_);
298     if (service_->shutdown_) {
299       lock.release()->unlock();
300       Shutdown(std::move(self), "SendReport");
301       return;
302     }
303     next_outbound_ =
304         CallableTag(std::bind(&ReportLoadHandler::ScheduleNextReport, this,
305                               std::placeholders::_1, std::placeholders::_2),
306                     std::move(self));
307     stream_.Write(response, &next_outbound_);
308     gpr_log(GPR_INFO,
309             "[LRS %p] Sending load report (lb_id_: %s, handler: %p, loads "
310             "count: %d)...",
311             service_, lb_id_.c_str(), this, response.load().size());
312   }
313 }
314 
OnDoneNotified(std::shared_ptr<ReportLoadHandler> self,bool ok)315 void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnDoneNotified(
316     std::shared_ptr<ReportLoadHandler> self, bool ok) {
317   GPR_ASSERT(ok);
318   done_notified_ = true;
319   if (ctx_.IsCancelled()) {
320     is_cancelled_ = true;
321   }
322   gpr_log(GPR_INFO,
323           "[LRS %p] Load reporting call is notified done (handler: %p, "
324           "is_cancelled: %d).",
325           service_, this, static_cast<int>(is_cancelled_));
326   Shutdown(std::move(self), "OnDoneNotified");
327 }
328 
Shutdown(std::shared_ptr<ReportLoadHandler> self,const char * reason)329 void LoadReporterAsyncServiceImpl::ReportLoadHandler::Shutdown(
330     std::shared_ptr<ReportLoadHandler> self, const char* reason) {
331   if (!shutdown_) {
332     gpr_log(GPR_INFO,
333             "[LRS %p] Shutting down the handler (lb_id_: %s, handler: %p, "
334             "reason: %s).",
335             service_, lb_id_.c_str(), this, reason);
336     shutdown_ = true;
337     if (call_status_ >= INITIAL_REQUEST_RECEIVED) {
338       load_reporter_->ReportStreamClosed(load_balanced_hostname_, lb_id_);
339       next_report_alarm_->Cancel();
340     }
341   }
342   // OnRequestDelivered() may be called after OnDoneNotified(), so we need to
343   // try to Finish() every time we are in Shutdown().
344   if (call_status_ >= DELIVERED && call_status_ < FINISH_CALLED) {
345     std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_);
346     if (!service_->shutdown_) {
347       on_finish_done_ =
348           CallableTag(std::bind(&ReportLoadHandler::OnFinishDone, this,
349                                 std::placeholders::_1, std::placeholders::_2),
350                       std::move(self));
351       // TODO(juanlishen): Maybe add a message proto for the client to
352       // explicitly cancel the stream so that we can return OK status in such
353       // cases.
354       stream_.Finish(Status::CANCELLED, &on_finish_done_);
355       call_status_ = FINISH_CALLED;
356     }
357   }
358 }
359 
OnFinishDone(std::shared_ptr<ReportLoadHandler> self,bool ok)360 void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnFinishDone(
361     std::shared_ptr<ReportLoadHandler> self, bool ok) {
362   if (ok) {
363     gpr_log(GPR_INFO,
364             "[LRS %p] Load reporting finished (lb_id_: %s, handler: %p).",
365             service_, lb_id_.c_str(), this);
366   }
367 }
368 
369 }  // namespace load_reporter
370 }  // namespace grpc
371