1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include <stdint.h>
22 #include <stdio.h>
23 
24 #include "src/core/ext/filters/client_channel/health/health_check_client.h"
25 
26 #include "upb/upb.hpp"
27 
28 #include "src/core/lib/debug/trace.h"
29 #include "src/core/lib/gprpp/sync.h"
30 #include "src/core/lib/slice/slice_internal.h"
31 #include "src/core/lib/transport/error_utils.h"
32 #include "src/core/lib/transport/status_metadata.h"
33 #include "src/proto/grpc/health/v1/health.upb.h"
34 
35 #define HEALTH_CHECK_INITIAL_CONNECT_BACKOFF_SECONDS 1
36 #define HEALTH_CHECK_RECONNECT_BACKOFF_MULTIPLIER 1.6
37 #define HEALTH_CHECK_RECONNECT_MAX_BACKOFF_SECONDS 120
38 #define HEALTH_CHECK_RECONNECT_JITTER 0.2
39 
40 namespace grpc_core {
41 
42 TraceFlag grpc_health_check_client_trace(false, "health_check_client");
43 
44 //
45 // HealthCheckClient
46 //
47 
HealthCheckClient(std::string service_name,RefCountedPtr<ConnectedSubchannel> connected_subchannel,grpc_pollset_set * interested_parties,RefCountedPtr<channelz::SubchannelNode> channelz_node,RefCountedPtr<ConnectivityStateWatcherInterface> watcher)48 HealthCheckClient::HealthCheckClient(
49     std::string service_name,
50     RefCountedPtr<ConnectedSubchannel> connected_subchannel,
51     grpc_pollset_set* interested_parties,
52     RefCountedPtr<channelz::SubchannelNode> channelz_node,
53     RefCountedPtr<ConnectivityStateWatcherInterface> watcher)
54     : InternallyRefCounted<HealthCheckClient>(
55           GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)
56               ? "HealthCheckClient"
57               : nullptr),
58       service_name_(std::move(service_name)),
59       connected_subchannel_(std::move(connected_subchannel)),
60       interested_parties_(interested_parties),
61       channelz_node_(std::move(channelz_node)),
62       watcher_(std::move(watcher)),
63       retry_backoff_(
64           BackOff::Options()
65               .set_initial_backoff(
66                   HEALTH_CHECK_INITIAL_CONNECT_BACKOFF_SECONDS * 1000)
67               .set_multiplier(HEALTH_CHECK_RECONNECT_BACKOFF_MULTIPLIER)
68               .set_jitter(HEALTH_CHECK_RECONNECT_JITTER)
69               .set_max_backoff(HEALTH_CHECK_RECONNECT_MAX_BACKOFF_SECONDS *
70                                1000)) {
71   if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
72     gpr_log(GPR_INFO, "created HealthCheckClient %p", this);
73   }
74   GRPC_CLOSURE_INIT(&retry_timer_callback_, OnRetryTimer, this,
75                     grpc_schedule_on_exec_ctx);
76   StartCall();
77 }
78 
~HealthCheckClient()79 HealthCheckClient::~HealthCheckClient() {
80   if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
81     gpr_log(GPR_INFO, "destroying HealthCheckClient %p", this);
82   }
83 }
84 
SetHealthStatus(grpc_connectivity_state state,const char * reason)85 void HealthCheckClient::SetHealthStatus(grpc_connectivity_state state,
86                                         const char* reason) {
87   MutexLock lock(&mu_);
88   SetHealthStatusLocked(state, reason);
89 }
90 
SetHealthStatusLocked(grpc_connectivity_state state,const char * reason)91 void HealthCheckClient::SetHealthStatusLocked(grpc_connectivity_state state,
92                                               const char* reason) {
93   if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
94     gpr_log(GPR_INFO, "HealthCheckClient %p: setting state=%s reason=%s", this,
95             ConnectivityStateName(state), reason);
96   }
97   if (watcher_ != nullptr) {
98     watcher_->Notify(state,
99                      state == GRPC_CHANNEL_TRANSIENT_FAILURE
100                          ? absl::Status(absl::StatusCode::kUnavailable, reason)
101                          : absl::Status());
102   }
103 }
104 
Orphan()105 void HealthCheckClient::Orphan() {
106   if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
107     gpr_log(GPR_INFO, "HealthCheckClient %p: shutting down", this);
108   }
109   {
110     MutexLock lock(&mu_);
111     shutting_down_ = true;
112     watcher_.reset();
113     call_state_.reset();
114     if (retry_timer_callback_pending_) {
115       grpc_timer_cancel(&retry_timer_);
116     }
117   }
118   Unref(DEBUG_LOCATION, "orphan");
119 }
120 
StartCall()121 void HealthCheckClient::StartCall() {
122   MutexLock lock(&mu_);
123   StartCallLocked();
124 }
125 
StartCallLocked()126 void HealthCheckClient::StartCallLocked() {
127   if (shutting_down_) return;
128   GPR_ASSERT(call_state_ == nullptr);
129   SetHealthStatusLocked(GRPC_CHANNEL_CONNECTING, "starting health watch");
130   call_state_ = MakeOrphanable<CallState>(Ref(), interested_parties_);
131   if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
132     gpr_log(GPR_INFO, "HealthCheckClient %p: created CallState %p", this,
133             call_state_.get());
134   }
135   call_state_->StartCall();
136 }
137 
StartRetryTimerLocked()138 void HealthCheckClient::StartRetryTimerLocked() {
139   SetHealthStatusLocked(GRPC_CHANNEL_TRANSIENT_FAILURE,
140                         "health check call failed; will retry after backoff");
141   grpc_millis next_try = retry_backoff_.NextAttemptTime();
142   if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
143     gpr_log(GPR_INFO, "HealthCheckClient %p: health check call lost...", this);
144     grpc_millis timeout = next_try - ExecCtx::Get()->Now();
145     if (timeout > 0) {
146       gpr_log(GPR_INFO,
147               "HealthCheckClient %p: ... will retry in %" PRId64 "ms.", this,
148               timeout);
149     } else {
150       gpr_log(GPR_INFO, "HealthCheckClient %p: ... retrying immediately.",
151               this);
152     }
153   }
154   // Ref for callback, tracked manually.
155   Ref(DEBUG_LOCATION, "health_retry_timer").release();
156   retry_timer_callback_pending_ = true;
157   grpc_timer_init(&retry_timer_, next_try, &retry_timer_callback_);
158 }
159 
OnRetryTimer(void * arg,grpc_error * error)160 void HealthCheckClient::OnRetryTimer(void* arg, grpc_error* error) {
161   HealthCheckClient* self = static_cast<HealthCheckClient*>(arg);
162   {
163     MutexLock lock(&self->mu_);
164     self->retry_timer_callback_pending_ = false;
165     if (!self->shutting_down_ && error == GRPC_ERROR_NONE &&
166         self->call_state_ == nullptr) {
167       if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
168         gpr_log(GPR_INFO, "HealthCheckClient %p: restarting health check call",
169                 self);
170       }
171       self->StartCallLocked();
172     }
173   }
174   self->Unref(DEBUG_LOCATION, "health_retry_timer");
175 }
176 
177 //
178 // protobuf helpers
179 //
180 
181 namespace {
182 
EncodeRequest(const std::string & service_name,ManualConstructor<SliceBufferByteStream> * send_message)183 void EncodeRequest(const std::string& service_name,
184                    ManualConstructor<SliceBufferByteStream>* send_message) {
185   upb::Arena arena;
186   grpc_health_v1_HealthCheckRequest* request_struct =
187       grpc_health_v1_HealthCheckRequest_new(arena.ptr());
188   grpc_health_v1_HealthCheckRequest_set_service(
189       request_struct,
190       upb_strview_make(service_name.data(), service_name.size()));
191   size_t buf_length;
192   char* buf = grpc_health_v1_HealthCheckRequest_serialize(
193       request_struct, arena.ptr(), &buf_length);
194   grpc_slice request_slice = GRPC_SLICE_MALLOC(buf_length);
195   memcpy(GRPC_SLICE_START_PTR(request_slice), buf, buf_length);
196   grpc_slice_buffer slice_buffer;
197   grpc_slice_buffer_init(&slice_buffer);
198   grpc_slice_buffer_add(&slice_buffer, request_slice);
199   send_message->Init(&slice_buffer, 0);
200   grpc_slice_buffer_destroy_internal(&slice_buffer);
201 }
202 
203 // Returns true if healthy.
204 // If there was an error parsing the response, sets *error and returns false.
DecodeResponse(grpc_slice_buffer * slice_buffer,grpc_error ** error)205 bool DecodeResponse(grpc_slice_buffer* slice_buffer, grpc_error** error) {
206   // If message is empty, assume unhealthy.
207   if (slice_buffer->length == 0) {
208     *error =
209         GRPC_ERROR_CREATE_FROM_STATIC_STRING("health check response was empty");
210     return false;
211   }
212   // Concatenate the slices to form a single string.
213   std::unique_ptr<uint8_t> recv_message_deleter;
214   uint8_t* recv_message;
215   if (slice_buffer->count == 1) {
216     recv_message = GRPC_SLICE_START_PTR(slice_buffer->slices[0]);
217   } else {
218     recv_message = static_cast<uint8_t*>(gpr_malloc(slice_buffer->length));
219     recv_message_deleter.reset(recv_message);
220     size_t offset = 0;
221     for (size_t i = 0; i < slice_buffer->count; ++i) {
222       memcpy(recv_message + offset,
223              GRPC_SLICE_START_PTR(slice_buffer->slices[i]),
224              GRPC_SLICE_LENGTH(slice_buffer->slices[i]));
225       offset += GRPC_SLICE_LENGTH(slice_buffer->slices[i]);
226     }
227   }
228   // Deserialize message.
229   upb::Arena arena;
230   grpc_health_v1_HealthCheckResponse* response_struct =
231       grpc_health_v1_HealthCheckResponse_parse(
232           reinterpret_cast<char*>(recv_message), slice_buffer->length,
233           arena.ptr());
234   if (response_struct == nullptr) {
235     // Can't parse message; assume unhealthy.
236     *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
237         "cannot parse health check response");
238     return false;
239   }
240   int32_t status = grpc_health_v1_HealthCheckResponse_status(response_struct);
241   return status == grpc_health_v1_HealthCheckResponse_SERVING;
242 }
243 
244 }  // namespace
245 
246 //
247 // HealthCheckClient::CallState
248 //
249 
CallState(RefCountedPtr<HealthCheckClient> health_check_client,grpc_pollset_set * interested_parties)250 HealthCheckClient::CallState::CallState(
251     RefCountedPtr<HealthCheckClient> health_check_client,
252     grpc_pollset_set* interested_parties)
253     : health_check_client_(std::move(health_check_client)),
254       pollent_(grpc_polling_entity_create_from_pollset_set(interested_parties)),
255       arena_(Arena::Create(health_check_client_->connected_subchannel_
256                                ->GetInitialCallSizeEstimate())),
257       payload_(context_) {}
258 
~CallState()259 HealthCheckClient::CallState::~CallState() {
260   if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
261     gpr_log(GPR_INFO, "HealthCheckClient %p: destroying CallState %p",
262             health_check_client_.get(), this);
263   }
264   for (size_t i = 0; i < GRPC_CONTEXT_COUNT; i++) {
265     if (context_[i].destroy != nullptr) {
266       context_[i].destroy(context_[i].value);
267     }
268   }
269   // Unset the call combiner cancellation closure.  This has the
270   // effect of scheduling the previously set cancellation closure, if
271   // any, so that it can release any internal references it may be
272   // holding to the call stack. Also flush the closures on exec_ctx so that
273   // filters that schedule cancel notification closures on exec_ctx do not
274   // need to take a ref of the call stack to guarantee closure liveness.
275   call_combiner_.SetNotifyOnCancel(nullptr);
276   ExecCtx::Get()->Flush();
277   arena_->Destroy();
278 }
279 
Orphan()280 void HealthCheckClient::CallState::Orphan() {
281   call_combiner_.Cancel(GRPC_ERROR_CANCELLED);
282   Cancel();
283 }
284 
StartCall()285 void HealthCheckClient::CallState::StartCall() {
286   SubchannelCall::Args args = {
287       health_check_client_->connected_subchannel_,
288       &pollent_,
289       GRPC_MDSTR_SLASH_GRPC_DOT_HEALTH_DOT_V1_DOT_HEALTH_SLASH_WATCH,
290       gpr_get_cycle_counter(),  // start_time
291       GRPC_MILLIS_INF_FUTURE,   // deadline
292       arena_,
293       context_,
294       &call_combiner_,
295   };
296   grpc_error* error = GRPC_ERROR_NONE;
297   call_ = SubchannelCall::Create(std::move(args), &error).release();
298   // Register after-destruction callback.
299   GRPC_CLOSURE_INIT(&after_call_stack_destruction_, AfterCallStackDestruction,
300                     this, grpc_schedule_on_exec_ctx);
301   call_->SetAfterCallStackDestroy(&after_call_stack_destruction_);
302   // Check if creation failed.
303   if (error != GRPC_ERROR_NONE) {
304     gpr_log(GPR_ERROR,
305             "HealthCheckClient %p CallState %p: error creating health "
306             "checking call on subchannel (%s); will retry",
307             health_check_client_.get(), this, grpc_error_string(error));
308     GRPC_ERROR_UNREF(error);
309     CallEndedLocked(/*retry=*/true);
310     return;
311   }
312   // Initialize payload and batch.
313   payload_.context = context_;
314   batch_.payload = &payload_;
315   // on_complete callback takes ref, handled manually.
316   call_->Ref(DEBUG_LOCATION, "on_complete").release();
317   batch_.on_complete = GRPC_CLOSURE_INIT(&on_complete_, OnComplete, this,
318                                          grpc_schedule_on_exec_ctx);
319   // Add send_initial_metadata op.
320   grpc_metadata_batch_init(&send_initial_metadata_);
321   error = grpc_metadata_batch_add_head(
322       &send_initial_metadata_, &path_metadata_storage_,
323       grpc_mdelem_from_slices(
324           GRPC_MDSTR_PATH,
325           GRPC_MDSTR_SLASH_GRPC_DOT_HEALTH_DOT_V1_DOT_HEALTH_SLASH_WATCH),
326       GRPC_BATCH_PATH);
327   GPR_ASSERT(error == GRPC_ERROR_NONE);
328   payload_.send_initial_metadata.send_initial_metadata =
329       &send_initial_metadata_;
330   payload_.send_initial_metadata.send_initial_metadata_flags = 0;
331   payload_.send_initial_metadata.peer_string = nullptr;
332   batch_.send_initial_metadata = true;
333   // Add send_message op.
334   EncodeRequest(health_check_client_->service_name_, &send_message_);
335   payload_.send_message.send_message.reset(send_message_.get());
336   batch_.send_message = true;
337   // Add send_trailing_metadata op.
338   grpc_metadata_batch_init(&send_trailing_metadata_);
339   payload_.send_trailing_metadata.send_trailing_metadata =
340       &send_trailing_metadata_;
341   batch_.send_trailing_metadata = true;
342   // Add recv_initial_metadata op.
343   grpc_metadata_batch_init(&recv_initial_metadata_);
344   payload_.recv_initial_metadata.recv_initial_metadata =
345       &recv_initial_metadata_;
346   payload_.recv_initial_metadata.recv_flags = nullptr;
347   payload_.recv_initial_metadata.trailing_metadata_available = nullptr;
348   payload_.recv_initial_metadata.peer_string = nullptr;
349   // recv_initial_metadata_ready callback takes ref, handled manually.
350   call_->Ref(DEBUG_LOCATION, "recv_initial_metadata_ready").release();
351   payload_.recv_initial_metadata.recv_initial_metadata_ready =
352       GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_, RecvInitialMetadataReady,
353                         this, grpc_schedule_on_exec_ctx);
354   batch_.recv_initial_metadata = true;
355   // Add recv_message op.
356   payload_.recv_message.recv_message = &recv_message_;
357   // recv_message callback takes ref, handled manually.
358   call_->Ref(DEBUG_LOCATION, "recv_message_ready").release();
359   payload_.recv_message.recv_message_ready = GRPC_CLOSURE_INIT(
360       &recv_message_ready_, RecvMessageReady, this, grpc_schedule_on_exec_ctx);
361   batch_.recv_message = true;
362   // Start batch.
363   StartBatch(&batch_);
364   // Initialize recv_trailing_metadata batch.
365   recv_trailing_metadata_batch_.payload = &payload_;
366   // Add recv_trailing_metadata op.
367   grpc_metadata_batch_init(&recv_trailing_metadata_);
368   payload_.recv_trailing_metadata.recv_trailing_metadata =
369       &recv_trailing_metadata_;
370   payload_.recv_trailing_metadata.collect_stats = &collect_stats_;
371   // This callback signals the end of the call, so it relies on the
372   // initial ref instead of taking a new ref.  When it's invoked, the
373   // initial ref is released.
374   payload_.recv_trailing_metadata.recv_trailing_metadata_ready =
375       GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_,
376                         RecvTrailingMetadataReady, this,
377                         grpc_schedule_on_exec_ctx);
378   recv_trailing_metadata_batch_.recv_trailing_metadata = true;
379   // Start recv_trailing_metadata batch.
380   StartBatch(&recv_trailing_metadata_batch_);
381 }
382 
StartBatchInCallCombiner(void * arg,grpc_error *)383 void HealthCheckClient::CallState::StartBatchInCallCombiner(
384     void* arg, grpc_error* /*error*/) {
385   grpc_transport_stream_op_batch* batch =
386       static_cast<grpc_transport_stream_op_batch*>(arg);
387   SubchannelCall* call =
388       static_cast<SubchannelCall*>(batch->handler_private.extra_arg);
389   call->StartTransportStreamOpBatch(batch);
390 }
391 
StartBatch(grpc_transport_stream_op_batch * batch)392 void HealthCheckClient::CallState::StartBatch(
393     grpc_transport_stream_op_batch* batch) {
394   batch->handler_private.extra_arg = call_;
395   GRPC_CLOSURE_INIT(&batch->handler_private.closure, StartBatchInCallCombiner,
396                     batch, grpc_schedule_on_exec_ctx);
397   GRPC_CALL_COMBINER_START(&call_combiner_, &batch->handler_private.closure,
398                            GRPC_ERROR_NONE, "start_subchannel_batch");
399 }
400 
AfterCallStackDestruction(void * arg,grpc_error *)401 void HealthCheckClient::CallState::AfterCallStackDestruction(
402     void* arg, grpc_error* /*error*/) {
403   HealthCheckClient::CallState* self =
404       static_cast<HealthCheckClient::CallState*>(arg);
405   delete self;
406 }
407 
OnCancelComplete(void * arg,grpc_error *)408 void HealthCheckClient::CallState::OnCancelComplete(void* arg,
409                                                     grpc_error* /*error*/) {
410   HealthCheckClient::CallState* self =
411       static_cast<HealthCheckClient::CallState*>(arg);
412   GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "health_cancel");
413   self->call_->Unref(DEBUG_LOCATION, "cancel");
414 }
415 
StartCancel(void * arg,grpc_error *)416 void HealthCheckClient::CallState::StartCancel(void* arg,
417                                                grpc_error* /*error*/) {
418   HealthCheckClient::CallState* self =
419       static_cast<HealthCheckClient::CallState*>(arg);
420   auto* batch = grpc_make_transport_stream_op(
421       GRPC_CLOSURE_CREATE(OnCancelComplete, self, grpc_schedule_on_exec_ctx));
422   batch->cancel_stream = true;
423   batch->payload->cancel_stream.cancel_error = GRPC_ERROR_CANCELLED;
424   self->call_->StartTransportStreamOpBatch(batch);
425 }
426 
Cancel()427 void HealthCheckClient::CallState::Cancel() {
428   bool expected = false;
429   if (cancelled_.CompareExchangeStrong(&expected, true, MemoryOrder::ACQ_REL,
430                                        MemoryOrder::ACQUIRE)) {
431     call_->Ref(DEBUG_LOCATION, "cancel").release();
432     GRPC_CALL_COMBINER_START(
433         &call_combiner_,
434         GRPC_CLOSURE_CREATE(StartCancel, this, grpc_schedule_on_exec_ctx),
435         GRPC_ERROR_NONE, "health_cancel");
436   }
437 }
438 
OnComplete(void * arg,grpc_error *)439 void HealthCheckClient::CallState::OnComplete(void* arg,
440                                               grpc_error* /*error*/) {
441   HealthCheckClient::CallState* self =
442       static_cast<HealthCheckClient::CallState*>(arg);
443   GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "on_complete");
444   grpc_metadata_batch_destroy(&self->send_initial_metadata_);
445   grpc_metadata_batch_destroy(&self->send_trailing_metadata_);
446   self->call_->Unref(DEBUG_LOCATION, "on_complete");
447 }
448 
RecvInitialMetadataReady(void * arg,grpc_error *)449 void HealthCheckClient::CallState::RecvInitialMetadataReady(
450     void* arg, grpc_error* /*error*/) {
451   HealthCheckClient::CallState* self =
452       static_cast<HealthCheckClient::CallState*>(arg);
453   GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "recv_initial_metadata_ready");
454   grpc_metadata_batch_destroy(&self->recv_initial_metadata_);
455   self->call_->Unref(DEBUG_LOCATION, "recv_initial_metadata_ready");
456 }
457 
DoneReadingRecvMessage(grpc_error * error)458 void HealthCheckClient::CallState::DoneReadingRecvMessage(grpc_error* error) {
459   recv_message_.reset();
460   if (error != GRPC_ERROR_NONE) {
461     GRPC_ERROR_UNREF(error);
462     Cancel();
463     grpc_slice_buffer_destroy_internal(&recv_message_buffer_);
464     call_->Unref(DEBUG_LOCATION, "recv_message_ready");
465     return;
466   }
467   const bool healthy = DecodeResponse(&recv_message_buffer_, &error);
468   const grpc_connectivity_state state =
469       healthy ? GRPC_CHANNEL_READY : GRPC_CHANNEL_TRANSIENT_FAILURE;
470   const char* reason = error == GRPC_ERROR_NONE && !healthy
471                            ? "backend unhealthy"
472                            : grpc_error_string(error);
473   health_check_client_->SetHealthStatus(state, reason);
474   seen_response_.Store(true, MemoryOrder::RELEASE);
475   grpc_slice_buffer_destroy_internal(&recv_message_buffer_);
476   // Start another recv_message batch.
477   // This re-uses the ref we're holding.
478   // Note: Can't just reuse batch_ here, since we don't know that all
479   // callbacks from the original batch have completed yet.
480   recv_message_batch_.payload = &payload_;
481   payload_.recv_message.recv_message = &recv_message_;
482   payload_.recv_message.recv_message_ready = GRPC_CLOSURE_INIT(
483       &recv_message_ready_, RecvMessageReady, this, grpc_schedule_on_exec_ctx);
484   recv_message_batch_.recv_message = true;
485   StartBatch(&recv_message_batch_);
486 }
487 
PullSliceFromRecvMessage()488 grpc_error* HealthCheckClient::CallState::PullSliceFromRecvMessage() {
489   grpc_slice slice;
490   grpc_error* error = recv_message_->Pull(&slice);
491   if (error == GRPC_ERROR_NONE) {
492     grpc_slice_buffer_add(&recv_message_buffer_, slice);
493   }
494   return error;
495 }
496 
ContinueReadingRecvMessage()497 void HealthCheckClient::CallState::ContinueReadingRecvMessage() {
498   while (recv_message_->Next(SIZE_MAX, &recv_message_ready_)) {
499     grpc_error* error = PullSliceFromRecvMessage();
500     if (error != GRPC_ERROR_NONE) {
501       DoneReadingRecvMessage(error);
502       return;
503     }
504     if (recv_message_buffer_.length == recv_message_->length()) {
505       DoneReadingRecvMessage(GRPC_ERROR_NONE);
506       break;
507     }
508   }
509 }
510 
OnByteStreamNext(void * arg,grpc_error * error)511 void HealthCheckClient::CallState::OnByteStreamNext(void* arg,
512                                                     grpc_error* error) {
513   HealthCheckClient::CallState* self =
514       static_cast<HealthCheckClient::CallState*>(arg);
515   if (error != GRPC_ERROR_NONE) {
516     self->DoneReadingRecvMessage(GRPC_ERROR_REF(error));
517     return;
518   }
519   error = self->PullSliceFromRecvMessage();
520   if (error != GRPC_ERROR_NONE) {
521     self->DoneReadingRecvMessage(error);
522     return;
523   }
524   if (self->recv_message_buffer_.length == self->recv_message_->length()) {
525     self->DoneReadingRecvMessage(GRPC_ERROR_NONE);
526   } else {
527     self->ContinueReadingRecvMessage();
528   }
529 }
530 
RecvMessageReady(void * arg,grpc_error *)531 void HealthCheckClient::CallState::RecvMessageReady(void* arg,
532                                                     grpc_error* /*error*/) {
533   HealthCheckClient::CallState* self =
534       static_cast<HealthCheckClient::CallState*>(arg);
535   GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "recv_message_ready");
536   if (self->recv_message_ == nullptr) {
537     self->call_->Unref(DEBUG_LOCATION, "recv_message_ready");
538     return;
539   }
540   grpc_slice_buffer_init(&self->recv_message_buffer_);
541   GRPC_CLOSURE_INIT(&self->recv_message_ready_, OnByteStreamNext, self,
542                     grpc_schedule_on_exec_ctx);
543   self->ContinueReadingRecvMessage();
544   // Ref will continue to be held until we finish draining the byte stream.
545 }
546 
RecvTrailingMetadataReady(void * arg,grpc_error * error)547 void HealthCheckClient::CallState::RecvTrailingMetadataReady(
548     void* arg, grpc_error* error) {
549   HealthCheckClient::CallState* self =
550       static_cast<HealthCheckClient::CallState*>(arg);
551   GRPC_CALL_COMBINER_STOP(&self->call_combiner_,
552                           "recv_trailing_metadata_ready");
553   // Get call status.
554   grpc_status_code status = GRPC_STATUS_UNKNOWN;
555   if (error != GRPC_ERROR_NONE) {
556     grpc_error_get_status(error, GRPC_MILLIS_INF_FUTURE, &status,
557                           nullptr /* slice */, nullptr /* http_error */,
558                           nullptr /* error_string */);
559   } else if (self->recv_trailing_metadata_.idx.named.grpc_status != nullptr) {
560     status = grpc_get_status_code_from_metadata(
561         self->recv_trailing_metadata_.idx.named.grpc_status->md);
562   }
563   if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
564     gpr_log(GPR_INFO,
565             "HealthCheckClient %p CallState %p: health watch failed with "
566             "status %d",
567             self->health_check_client_.get(), self, status);
568   }
569   // Clean up.
570   grpc_metadata_batch_destroy(&self->recv_trailing_metadata_);
571   // For status UNIMPLEMENTED, give up and assume always healthy.
572   bool retry = true;
573   if (status == GRPC_STATUS_UNIMPLEMENTED) {
574     static const char kErrorMessage[] =
575         "health checking Watch method returned UNIMPLEMENTED; "
576         "disabling health checks but assuming server is healthy";
577     gpr_log(GPR_ERROR, kErrorMessage);
578     if (self->health_check_client_->channelz_node_ != nullptr) {
579       self->health_check_client_->channelz_node_->AddTraceEvent(
580           channelz::ChannelTrace::Error,
581           grpc_slice_from_static_string(kErrorMessage));
582     }
583     self->health_check_client_->SetHealthStatus(GRPC_CHANNEL_READY,
584                                                 kErrorMessage);
585     retry = false;
586   }
587   MutexLock lock(&self->health_check_client_->mu_);
588   self->CallEndedLocked(retry);
589 }
590 
CallEndedLocked(bool retry)591 void HealthCheckClient::CallState::CallEndedLocked(bool retry) {
592   // If this CallState is still in use, this call ended because of a failure,
593   // so we need to stop using it and optionally create a new one.
594   // Otherwise, we have deliberately ended this call, and no further action
595   // is required.
596   if (this == health_check_client_->call_state_.get()) {
597     health_check_client_->call_state_.reset();
598     if (retry) {
599       GPR_ASSERT(!health_check_client_->shutting_down_);
600       if (seen_response_.Load(MemoryOrder::ACQUIRE)) {
601         // If the call fails after we've gotten a successful response, reset
602         // the backoff and restart the call immediately.
603         health_check_client_->retry_backoff_.Reset();
604         health_check_client_->StartCallLocked();
605       } else {
606         // If the call failed without receiving any messages, retry later.
607         health_check_client_->StartRetryTimerLocked();
608       }
609     }
610   }
611   // When the last ref to the call stack goes away, the CallState object
612   // will be automatically destroyed.
613   call_->Unref(DEBUG_LOCATION, "call_ended");
614 }
615 
616 }  // namespace grpc_core
617