1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_ASYNC_SERVICE_IMPL_H
20 #define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_ASYNC_SERVICE_IMPL_H
21 
22 #include <grpc/support/port_platform.h>
23 
24 #include <grpc/support/log.h>
25 #include <grpcpp/alarm.h>
26 #include <grpcpp/grpcpp.h>
27 
28 #include "src/core/lib/gprpp/thd.h"
29 #include "src/cpp/server/load_reporter/load_reporter.h"
30 
31 namespace grpc {
32 namespace load_reporter {
33 
34 // Async load reporting service. It's mainly responsible for controlling the
35 // procedure of incoming requests. The real business logic is handed off to the
36 // LoadReporter. There should be at most one instance of this service on a
37 // server to avoid spreading the load data into multiple places.
38 class LoadReporterAsyncServiceImpl
39     : public grpc::lb::v1::LoadReporter::AsyncService {
40  public:
41   explicit LoadReporterAsyncServiceImpl(
42       std::unique_ptr<ServerCompletionQueue> cq);
43   ~LoadReporterAsyncServiceImpl();
44 
45   // Starts the working thread.
46   void StartThread();
47 
48   // Not copyable nor movable.
49   LoadReporterAsyncServiceImpl(const LoadReporterAsyncServiceImpl&) = delete;
50   LoadReporterAsyncServiceImpl& operator=(const LoadReporterAsyncServiceImpl&) =
51       delete;
52 
53  private:
54   class ReportLoadHandler;
55 
56   // A tag that can be called with a bool argument. It's tailored for
57   // ReportLoadHandler's use. Before being used, it should be constructed with a
58   // method of ReportLoadHandler and a shared pointer to the handler. The
59   // shared pointer will be moved to the invoked function and the function can
60   // only be invoked once. That makes ref counting of the handler easier,
61   // because the shared pointer is not bound to the function and can be gone
62   // once the invoked function returns (if not used any more).
63   class CallableTag {
64    public:
65     using HandlerFunction =
66         std::function<void(std::shared_ptr<ReportLoadHandler>, bool)>;
67 
CallableTag()68     CallableTag() {}
69 
CallableTag(HandlerFunction func,std::shared_ptr<ReportLoadHandler> handler)70     CallableTag(HandlerFunction func,
71                 std::shared_ptr<ReportLoadHandler> handler)
72         : handler_function_(std::move(func)), handler_(std::move(handler)) {
73       GPR_ASSERT(handler_function_ != nullptr);
74       GPR_ASSERT(handler_ != nullptr);
75     }
76 
77     // Runs the tag. This should be called only once. The handler is no longer
78     // owned by this tag after this method is invoked.
79     void Run(bool ok);
80 
81     // Releases and returns the shared pointer to the handler.
ReleaseHandler()82     std::shared_ptr<ReportLoadHandler> ReleaseHandler() {
83       return std::move(handler_);
84     }
85 
86    private:
87     HandlerFunction handler_function_ = nullptr;
88     std::shared_ptr<ReportLoadHandler> handler_;
89   };
90 
91   // Each handler takes care of one load reporting stream. It contains
92   // per-stream data and it will access the members of the parent class (i.e.,
93   // LoadReporterAsyncServiceImpl) for service-wide data (e.g., the load data).
94   class ReportLoadHandler {
95    public:
96     // Instantiates a ReportLoadHandler and requests the next load reporting
97     // call. The handler object will manage its own lifetime, so no action is
98     // needed from the caller any more regarding that object.
99     static void CreateAndStart(ServerCompletionQueue* cq,
100                                LoadReporterAsyncServiceImpl* service,
101                                LoadReporter* load_reporter);
102 
103     // This ctor is public because we want to use std::make_shared<> in
104     // CreateAndStart(). This ctor shouldn't be used elsewhere.
105     ReportLoadHandler(ServerCompletionQueue* cq,
106                       LoadReporterAsyncServiceImpl* service,
107                       LoadReporter* load_reporter);
108 
109    private:
110     // After the handler has a call request delivered, it starts reading the
111     // initial request. Also, a new handler is spawned so that we can keep
112     // servicing future calls.
113     void OnRequestDelivered(std::shared_ptr<ReportLoadHandler> self, bool ok);
114 
115     // The first Read() is expected to succeed, after which the handler starts
116     // sending load reports back to the balancer. The second Read() is
117     // expected to fail, which happens when the balancer half-closes the
118     // stream to signal that it's no longer interested in the load reports. For
119     // the latter case, the handler will then close the stream.
120     void OnReadDone(std::shared_ptr<ReportLoadHandler> self, bool ok);
121 
122     // The report sending operations are sequential as: send report -> send
123     // done, schedule the next send -> waiting for the alarm to fire -> alarm
124     // fires, send report -> ...
125     void SendReport(std::shared_ptr<ReportLoadHandler> self, bool ok);
126     void ScheduleNextReport(std::shared_ptr<ReportLoadHandler> self, bool ok);
127 
128     // Called when Finish() is done.
129     void OnFinishDone(std::shared_ptr<ReportLoadHandler> self, bool ok);
130 
131     // Called when AsyncNotifyWhenDone() notifies us.
132     void OnDoneNotified(std::shared_ptr<ReportLoadHandler> self, bool ok);
133 
134     void Shutdown(std::shared_ptr<ReportLoadHandler> self, const char* reason);
135 
136     // The key fields of the stream.
137     grpc::string lb_id_;
138     grpc::string load_balanced_hostname_;
139     grpc::string load_key_;
140     uint64_t load_report_interval_ms_;
141 
142     // The data for RPC communication with the load reportee.
143     ServerContext ctx_;
144     ::grpc::lb::v1::LoadReportRequest request_;
145 
146     // The members passed down from LoadReporterAsyncServiceImpl.
147     ServerCompletionQueue* cq_;
148     LoadReporterAsyncServiceImpl* service_;
149     LoadReporter* load_reporter_;
150     ServerAsyncReaderWriter<::grpc::lb::v1::LoadReportResponse,
151                             ::grpc::lb::v1::LoadReportRequest>
152         stream_;
153 
154     // The status of the RPC progress.
155     enum CallStatus {
156       WAITING_FOR_DELIVERY,
157       DELIVERED,
158       INITIAL_REQUEST_RECEIVED,
159       INITIAL_RESPONSE_SENT,
160       FINISH_CALLED
161     } call_status_;
162     bool shutdown_{false};
163     bool done_notified_{false};
164     bool is_cancelled_{false};
165     CallableTag on_done_notified_;
166     CallableTag on_finish_done_;
167     CallableTag next_inbound_;
168     CallableTag next_outbound_;
169     std::unique_ptr<Alarm> next_report_alarm_;
170   };
171 
172   // Handles the incoming requests and drives the completion queue in a loop.
173   static void Work(void* arg);
174 
175   // Schedules the next data fetching from Census and LB feedback sampling.
176   void ScheduleNextFetchAndSample();
177 
178   // Fetches data from Census and samples LB feedback.
179   void FetchAndSample(bool ok);
180 
181   std::unique_ptr<ServerCompletionQueue> cq_;
182   // To synchronize the operations related to shutdown state of cq_, so that we
183   // don't enqueue new tags into cq_ after it is already shut down.
184   std::mutex cq_shutdown_mu_;
185   std::atomic_bool shutdown_{false};
186   std::unique_ptr<::grpc_core::Thread> thread_;
187   std::unique_ptr<LoadReporter> load_reporter_;
188   std::unique_ptr<Alarm> next_fetch_and_sample_alarm_;
189 };
190 
191 }  // namespace load_reporter
192 }  // namespace grpc
193 
194 #endif  // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_ASYNC_SERVICE_IMPL_H
195