1 /* 2 * 3 * Copyright 2018 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_ASYNC_SERVICE_IMPL_H 20 #define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_ASYNC_SERVICE_IMPL_H 21 22 #include <grpc/support/port_platform.h> 23 24 #include <grpc/support/log.h> 25 #include <grpcpp/alarm.h> 26 #include <grpcpp/grpcpp.h> 27 28 #include "src/core/lib/gprpp/thd.h" 29 #include "src/cpp/server/load_reporter/load_reporter.h" 30 31 namespace grpc { 32 namespace load_reporter { 33 34 // Async load reporting service. It's mainly responsible for controlling the 35 // procedure of incoming requests. The real business logic is handed off to the 36 // LoadReporter. There should be at most one instance of this service on a 37 // server to avoid spreading the load data into multiple places. 38 class LoadReporterAsyncServiceImpl 39 : public grpc::lb::v1::LoadReporter::AsyncService { 40 public: 41 explicit LoadReporterAsyncServiceImpl( 42 std::unique_ptr<ServerCompletionQueue> cq); 43 ~LoadReporterAsyncServiceImpl(); 44 45 // Starts the working thread. 46 void StartThread(); 47 48 // Not copyable nor movable. 49 LoadReporterAsyncServiceImpl(const LoadReporterAsyncServiceImpl&) = delete; 50 LoadReporterAsyncServiceImpl& operator=(const LoadReporterAsyncServiceImpl&) = 51 delete; 52 53 private: 54 class ReportLoadHandler; 55 56 // A tag that can be called with a bool argument. It's tailored for 57 // ReportLoadHandler's use. Before being used, it should be constructed with a 58 // method of ReportLoadHandler and a shared pointer to the handler. The 59 // shared pointer will be moved to the invoked function and the function can 60 // only be invoked once. That makes ref counting of the handler easier, 61 // because the shared pointer is not bound to the function and can be gone 62 // once the invoked function returns (if not used any more). 63 class CallableTag { 64 public: 65 using HandlerFunction = 66 std::function<void(std::shared_ptr<ReportLoadHandler>, bool)>; 67 CallableTag()68 CallableTag() {} 69 CallableTag(HandlerFunction func,std::shared_ptr<ReportLoadHandler> handler)70 CallableTag(HandlerFunction func, 71 std::shared_ptr<ReportLoadHandler> handler) 72 : handler_function_(std::move(func)), handler_(std::move(handler)) { 73 GPR_ASSERT(handler_function_ != nullptr); 74 GPR_ASSERT(handler_ != nullptr); 75 } 76 77 // Runs the tag. This should be called only once. The handler is no longer 78 // owned by this tag after this method is invoked. 79 void Run(bool ok); 80 81 // Releases and returns the shared pointer to the handler. ReleaseHandler()82 std::shared_ptr<ReportLoadHandler> ReleaseHandler() { 83 return std::move(handler_); 84 } 85 86 private: 87 HandlerFunction handler_function_ = nullptr; 88 std::shared_ptr<ReportLoadHandler> handler_; 89 }; 90 91 // Each handler takes care of one load reporting stream. It contains 92 // per-stream data and it will access the members of the parent class (i.e., 93 // LoadReporterAsyncServiceImpl) for service-wide data (e.g., the load data). 94 class ReportLoadHandler { 95 public: 96 // Instantiates a ReportLoadHandler and requests the next load reporting 97 // call. The handler object will manage its own lifetime, so no action is 98 // needed from the caller any more regarding that object. 99 static void CreateAndStart(ServerCompletionQueue* cq, 100 LoadReporterAsyncServiceImpl* service, 101 LoadReporter* load_reporter); 102 103 // This ctor is public because we want to use std::make_shared<> in 104 // CreateAndStart(). This ctor shouldn't be used elsewhere. 105 ReportLoadHandler(ServerCompletionQueue* cq, 106 LoadReporterAsyncServiceImpl* service, 107 LoadReporter* load_reporter); 108 109 private: 110 // After the handler has a call request delivered, it starts reading the 111 // initial request. Also, a new handler is spawned so that we can keep 112 // servicing future calls. 113 void OnRequestDelivered(std::shared_ptr<ReportLoadHandler> self, bool ok); 114 115 // The first Read() is expected to succeed, after which the handler starts 116 // sending load reports back to the balancer. The second Read() is 117 // expected to fail, which happens when the balancer half-closes the 118 // stream to signal that it's no longer interested in the load reports. For 119 // the latter case, the handler will then close the stream. 120 void OnReadDone(std::shared_ptr<ReportLoadHandler> self, bool ok); 121 122 // The report sending operations are sequential as: send report -> send 123 // done, schedule the next send -> waiting for the alarm to fire -> alarm 124 // fires, send report -> ... 125 void SendReport(std::shared_ptr<ReportLoadHandler> self, bool ok); 126 void ScheduleNextReport(std::shared_ptr<ReportLoadHandler> self, bool ok); 127 128 // Called when Finish() is done. 129 void OnFinishDone(std::shared_ptr<ReportLoadHandler> self, bool ok); 130 131 // Called when AsyncNotifyWhenDone() notifies us. 132 void OnDoneNotified(std::shared_ptr<ReportLoadHandler> self, bool ok); 133 134 void Shutdown(std::shared_ptr<ReportLoadHandler> self, const char* reason); 135 136 // The key fields of the stream. 137 grpc::string lb_id_; 138 grpc::string load_balanced_hostname_; 139 grpc::string load_key_; 140 uint64_t load_report_interval_ms_; 141 142 // The data for RPC communication with the load reportee. 143 ServerContext ctx_; 144 ::grpc::lb::v1::LoadReportRequest request_; 145 146 // The members passed down from LoadReporterAsyncServiceImpl. 147 ServerCompletionQueue* cq_; 148 LoadReporterAsyncServiceImpl* service_; 149 LoadReporter* load_reporter_; 150 ServerAsyncReaderWriter<::grpc::lb::v1::LoadReportResponse, 151 ::grpc::lb::v1::LoadReportRequest> 152 stream_; 153 154 // The status of the RPC progress. 155 enum CallStatus { 156 WAITING_FOR_DELIVERY, 157 DELIVERED, 158 INITIAL_REQUEST_RECEIVED, 159 INITIAL_RESPONSE_SENT, 160 FINISH_CALLED 161 } call_status_; 162 bool shutdown_{false}; 163 bool done_notified_{false}; 164 bool is_cancelled_{false}; 165 CallableTag on_done_notified_; 166 CallableTag on_finish_done_; 167 CallableTag next_inbound_; 168 CallableTag next_outbound_; 169 std::unique_ptr<Alarm> next_report_alarm_; 170 }; 171 172 // Handles the incoming requests and drives the completion queue in a loop. 173 static void Work(void* arg); 174 175 // Schedules the next data fetching from Census and LB feedback sampling. 176 void ScheduleNextFetchAndSample(); 177 178 // Fetches data from Census and samples LB feedback. 179 void FetchAndSample(bool ok); 180 181 std::unique_ptr<ServerCompletionQueue> cq_; 182 // To synchronize the operations related to shutdown state of cq_, so that we 183 // don't enqueue new tags into cq_ after it is already shut down. 184 std::mutex cq_shutdown_mu_; 185 std::atomic_bool shutdown_{false}; 186 std::unique_ptr<::grpc_core::Thread> thread_; 187 std::unique_ptr<LoadReporter> load_reporter_; 188 std::unique_ptr<Alarm> next_fetch_and_sample_alarm_; 189 }; 190 191 } // namespace load_reporter 192 } // namespace grpc 193 194 #endif // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_ASYNC_SERVICE_IMPL_H 195