1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <memory>
20 #include <mutex>
21 
22 #include <grpc/slice.h>
23 #include <grpc/support/alloc.h>
24 #include <grpc/support/log.h>
25 #include <grpcpp/impl/codegen/method_handler_impl.h>
26 
27 #include "pb_decode.h"
28 #include "pb_encode.h"
29 #include "src/cpp/server/health/default_health_check_service.h"
30 #include "src/cpp/server/health/health.pb.h"
31 
32 namespace grpc {
33 namespace {
34 const char kHealthCheckMethodName[] = "/grpc.health.v1.Health/Check";
35 }  // namespace
36 
HealthCheckServiceImpl(DefaultHealthCheckService * service)37 DefaultHealthCheckService::HealthCheckServiceImpl::HealthCheckServiceImpl(
38     DefaultHealthCheckService* service)
39     : service_(service), method_(nullptr) {
40   internal::MethodHandler* handler =
41       new internal::RpcMethodHandler<HealthCheckServiceImpl, ByteBuffer,
42                                      ByteBuffer>(
43           std::mem_fn(&HealthCheckServiceImpl::Check), this);
44   method_ = new internal::RpcServiceMethod(
45       kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, handler);
46   AddMethod(method_);
47 }
48 
Check(ServerContext * context,const ByteBuffer * request,ByteBuffer * response)49 Status DefaultHealthCheckService::HealthCheckServiceImpl::Check(
50     ServerContext* context, const ByteBuffer* request, ByteBuffer* response) {
51   // Decode request.
52   std::vector<Slice> slices;
53   if (!request->Dump(&slices).ok()) {
54     return Status(StatusCode::INVALID_ARGUMENT, "");
55   }
56   uint8_t* request_bytes = nullptr;
57   bool request_bytes_owned = false;
58   size_t request_size = 0;
59   grpc_health_v1_HealthCheckRequest request_struct;
60   if (slices.empty()) {
61     request_struct.has_service = false;
62   } else if (slices.size() == 1) {
63     request_bytes = const_cast<uint8_t*>(slices[0].begin());
64     request_size = slices[0].size();
65   } else {
66     request_bytes_owned = true;
67     request_bytes = static_cast<uint8_t*>(gpr_malloc(request->Length()));
68     uint8_t* copy_to = request_bytes;
69     for (size_t i = 0; i < slices.size(); i++) {
70       memcpy(copy_to, slices[i].begin(), slices[i].size());
71       copy_to += slices[i].size();
72     }
73   }
74 
75   if (request_bytes != nullptr) {
76     pb_istream_t istream = pb_istream_from_buffer(request_bytes, request_size);
77     bool decode_status = pb_decode(
78         &istream, grpc_health_v1_HealthCheckRequest_fields, &request_struct);
79     if (request_bytes_owned) {
80       gpr_free(request_bytes);
81     }
82     if (!decode_status) {
83       return Status(StatusCode::INVALID_ARGUMENT, "");
84     }
85   }
86 
87   // Check status from the associated default health checking service.
88   DefaultHealthCheckService::ServingStatus serving_status =
89       service_->GetServingStatus(
90           request_struct.has_service ? request_struct.service : "");
91   if (serving_status == DefaultHealthCheckService::NOT_FOUND) {
92     return Status(StatusCode::NOT_FOUND, "");
93   }
94 
95   // Encode response
96   grpc_health_v1_HealthCheckResponse response_struct;
97   response_struct.has_status = true;
98   response_struct.status =
99       serving_status == DefaultHealthCheckService::SERVING
100           ? grpc_health_v1_HealthCheckResponse_ServingStatus_SERVING
101           : grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING;
102   pb_ostream_t ostream;
103   memset(&ostream, 0, sizeof(ostream));
104   pb_encode(&ostream, grpc_health_v1_HealthCheckResponse_fields,
105             &response_struct);
106   grpc_slice response_slice = grpc_slice_malloc(ostream.bytes_written);
107   ostream = pb_ostream_from_buffer(GRPC_SLICE_START_PTR(response_slice),
108                                    GRPC_SLICE_LENGTH(response_slice));
109   bool encode_status = pb_encode(
110       &ostream, grpc_health_v1_HealthCheckResponse_fields, &response_struct);
111   if (!encode_status) {
112     return Status(StatusCode::INTERNAL, "Failed to encode response.");
113   }
114   Slice encoded_response(response_slice, Slice::STEAL_REF);
115   ByteBuffer response_buffer(&encoded_response, 1);
116   response->Swap(&response_buffer);
117   return Status::OK;
118 }
119 
DefaultHealthCheckService()120 DefaultHealthCheckService::DefaultHealthCheckService() {
121   services_map_.emplace("", true);
122 }
123 
SetServingStatus(const grpc::string & service_name,bool serving)124 void DefaultHealthCheckService::SetServingStatus(
125     const grpc::string& service_name, bool serving) {
126   std::lock_guard<std::mutex> lock(mu_);
127   services_map_[service_name] = serving;
128 }
129 
SetServingStatus(bool serving)130 void DefaultHealthCheckService::SetServingStatus(bool serving) {
131   std::lock_guard<std::mutex> lock(mu_);
132   for (auto iter = services_map_.begin(); iter != services_map_.end(); ++iter) {
133     iter->second = serving;
134   }
135 }
136 
137 DefaultHealthCheckService::ServingStatus
GetServingStatus(const grpc::string & service_name) const138 DefaultHealthCheckService::GetServingStatus(
139     const grpc::string& service_name) const {
140   std::lock_guard<std::mutex> lock(mu_);
141   const auto& iter = services_map_.find(service_name);
142   if (iter == services_map_.end()) {
143     return NOT_FOUND;
144   }
145   return iter->second ? SERVING : NOT_SERVING;
146 }
147 
148 DefaultHealthCheckService::HealthCheckServiceImpl*
GetHealthCheckService()149 DefaultHealthCheckService::GetHealthCheckService() {
150   GPR_ASSERT(impl_ == nullptr);
151   impl_.reset(new HealthCheckServiceImpl(this));
152   return impl_.get();
153 }
154 
155 }  // namespace grpc
156