1 /*
2 *
3 * Copyright 2018 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include <stdint.h>
22 #include <stdio.h>
23
24 #include "src/core/ext/filters/client_channel/health/health_check_client.h"
25
26 #include "upb/upb.hpp"
27
28 #include "src/core/lib/debug/trace.h"
29 #include "src/core/lib/gprpp/sync.h"
30 #include "src/core/lib/slice/slice_internal.h"
31 #include "src/core/lib/transport/error_utils.h"
32 #include "src/core/lib/transport/status_metadata.h"
33 #include "src/proto/grpc/health/v1/health.upb.h"
34
35 #define HEALTH_CHECK_INITIAL_CONNECT_BACKOFF_SECONDS 1
36 #define HEALTH_CHECK_RECONNECT_BACKOFF_MULTIPLIER 1.6
37 #define HEALTH_CHECK_RECONNECT_MAX_BACKOFF_SECONDS 120
38 #define HEALTH_CHECK_RECONNECT_JITTER 0.2
39
40 namespace grpc_core {
41
42 TraceFlag grpc_health_check_client_trace(false, "health_check_client");
43
44 //
45 // HealthCheckClient
46 //
47
HealthCheckClient(std::string service_name,RefCountedPtr<ConnectedSubchannel> connected_subchannel,grpc_pollset_set * interested_parties,RefCountedPtr<channelz::SubchannelNode> channelz_node,RefCountedPtr<ConnectivityStateWatcherInterface> watcher)48 HealthCheckClient::HealthCheckClient(
49 std::string service_name,
50 RefCountedPtr<ConnectedSubchannel> connected_subchannel,
51 grpc_pollset_set* interested_parties,
52 RefCountedPtr<channelz::SubchannelNode> channelz_node,
53 RefCountedPtr<ConnectivityStateWatcherInterface> watcher)
54 : InternallyRefCounted<HealthCheckClient>(
55 GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)
56 ? "HealthCheckClient"
57 : nullptr),
58 service_name_(std::move(service_name)),
59 connected_subchannel_(std::move(connected_subchannel)),
60 interested_parties_(interested_parties),
61 channelz_node_(std::move(channelz_node)),
62 watcher_(std::move(watcher)),
63 retry_backoff_(
64 BackOff::Options()
65 .set_initial_backoff(
66 HEALTH_CHECK_INITIAL_CONNECT_BACKOFF_SECONDS * 1000)
67 .set_multiplier(HEALTH_CHECK_RECONNECT_BACKOFF_MULTIPLIER)
68 .set_jitter(HEALTH_CHECK_RECONNECT_JITTER)
69 .set_max_backoff(HEALTH_CHECK_RECONNECT_MAX_BACKOFF_SECONDS *
70 1000)) {
71 if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
72 gpr_log(GPR_INFO, "created HealthCheckClient %p", this);
73 }
74 GRPC_CLOSURE_INIT(&retry_timer_callback_, OnRetryTimer, this,
75 grpc_schedule_on_exec_ctx);
76 StartCall();
77 }
78
~HealthCheckClient()79 HealthCheckClient::~HealthCheckClient() {
80 if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
81 gpr_log(GPR_INFO, "destroying HealthCheckClient %p", this);
82 }
83 }
84
SetHealthStatus(grpc_connectivity_state state,const char * reason)85 void HealthCheckClient::SetHealthStatus(grpc_connectivity_state state,
86 const char* reason) {
87 MutexLock lock(&mu_);
88 SetHealthStatusLocked(state, reason);
89 }
90
SetHealthStatusLocked(grpc_connectivity_state state,const char * reason)91 void HealthCheckClient::SetHealthStatusLocked(grpc_connectivity_state state,
92 const char* reason) {
93 if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
94 gpr_log(GPR_INFO, "HealthCheckClient %p: setting state=%s reason=%s", this,
95 ConnectivityStateName(state), reason);
96 }
97 if (watcher_ != nullptr) {
98 watcher_->Notify(state,
99 state == GRPC_CHANNEL_TRANSIENT_FAILURE
100 ? absl::Status(absl::StatusCode::kUnavailable, reason)
101 : absl::Status());
102 }
103 }
104
Orphan()105 void HealthCheckClient::Orphan() {
106 if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
107 gpr_log(GPR_INFO, "HealthCheckClient %p: shutting down", this);
108 }
109 {
110 MutexLock lock(&mu_);
111 shutting_down_ = true;
112 watcher_.reset();
113 call_state_.reset();
114 if (retry_timer_callback_pending_) {
115 grpc_timer_cancel(&retry_timer_);
116 }
117 }
118 Unref(DEBUG_LOCATION, "orphan");
119 }
120
StartCall()121 void HealthCheckClient::StartCall() {
122 MutexLock lock(&mu_);
123 StartCallLocked();
124 }
125
StartCallLocked()126 void HealthCheckClient::StartCallLocked() {
127 if (shutting_down_) return;
128 GPR_ASSERT(call_state_ == nullptr);
129 SetHealthStatusLocked(GRPC_CHANNEL_CONNECTING, "starting health watch");
130 call_state_ = MakeOrphanable<CallState>(Ref(), interested_parties_);
131 if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
132 gpr_log(GPR_INFO, "HealthCheckClient %p: created CallState %p", this,
133 call_state_.get());
134 }
135 call_state_->StartCall();
136 }
137
StartRetryTimerLocked()138 void HealthCheckClient::StartRetryTimerLocked() {
139 SetHealthStatusLocked(GRPC_CHANNEL_TRANSIENT_FAILURE,
140 "health check call failed; will retry after backoff");
141 grpc_millis next_try = retry_backoff_.NextAttemptTime();
142 if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
143 gpr_log(GPR_INFO, "HealthCheckClient %p: health check call lost...", this);
144 grpc_millis timeout = next_try - ExecCtx::Get()->Now();
145 if (timeout > 0) {
146 gpr_log(GPR_INFO,
147 "HealthCheckClient %p: ... will retry in %" PRId64 "ms.", this,
148 timeout);
149 } else {
150 gpr_log(GPR_INFO, "HealthCheckClient %p: ... retrying immediately.",
151 this);
152 }
153 }
154 // Ref for callback, tracked manually.
155 Ref(DEBUG_LOCATION, "health_retry_timer").release();
156 retry_timer_callback_pending_ = true;
157 grpc_timer_init(&retry_timer_, next_try, &retry_timer_callback_);
158 }
159
OnRetryTimer(void * arg,grpc_error * error)160 void HealthCheckClient::OnRetryTimer(void* arg, grpc_error* error) {
161 HealthCheckClient* self = static_cast<HealthCheckClient*>(arg);
162 {
163 MutexLock lock(&self->mu_);
164 self->retry_timer_callback_pending_ = false;
165 if (!self->shutting_down_ && error == GRPC_ERROR_NONE &&
166 self->call_state_ == nullptr) {
167 if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
168 gpr_log(GPR_INFO, "HealthCheckClient %p: restarting health check call",
169 self);
170 }
171 self->StartCallLocked();
172 }
173 }
174 self->Unref(DEBUG_LOCATION, "health_retry_timer");
175 }
176
177 //
178 // protobuf helpers
179 //
180
181 namespace {
182
EncodeRequest(const std::string & service_name,ManualConstructor<SliceBufferByteStream> * send_message)183 void EncodeRequest(const std::string& service_name,
184 ManualConstructor<SliceBufferByteStream>* send_message) {
185 upb::Arena arena;
186 grpc_health_v1_HealthCheckRequest* request_struct =
187 grpc_health_v1_HealthCheckRequest_new(arena.ptr());
188 grpc_health_v1_HealthCheckRequest_set_service(
189 request_struct,
190 upb_strview_make(service_name.data(), service_name.size()));
191 size_t buf_length;
192 char* buf = grpc_health_v1_HealthCheckRequest_serialize(
193 request_struct, arena.ptr(), &buf_length);
194 grpc_slice request_slice = GRPC_SLICE_MALLOC(buf_length);
195 memcpy(GRPC_SLICE_START_PTR(request_slice), buf, buf_length);
196 grpc_slice_buffer slice_buffer;
197 grpc_slice_buffer_init(&slice_buffer);
198 grpc_slice_buffer_add(&slice_buffer, request_slice);
199 send_message->Init(&slice_buffer, 0);
200 grpc_slice_buffer_destroy_internal(&slice_buffer);
201 }
202
203 // Returns true if healthy.
204 // If there was an error parsing the response, sets *error and returns false.
DecodeResponse(grpc_slice_buffer * slice_buffer,grpc_error ** error)205 bool DecodeResponse(grpc_slice_buffer* slice_buffer, grpc_error** error) {
206 // If message is empty, assume unhealthy.
207 if (slice_buffer->length == 0) {
208 *error =
209 GRPC_ERROR_CREATE_FROM_STATIC_STRING("health check response was empty");
210 return false;
211 }
212 // Concatenate the slices to form a single string.
213 std::unique_ptr<uint8_t> recv_message_deleter;
214 uint8_t* recv_message;
215 if (slice_buffer->count == 1) {
216 recv_message = GRPC_SLICE_START_PTR(slice_buffer->slices[0]);
217 } else {
218 recv_message = static_cast<uint8_t*>(gpr_malloc(slice_buffer->length));
219 recv_message_deleter.reset(recv_message);
220 size_t offset = 0;
221 for (size_t i = 0; i < slice_buffer->count; ++i) {
222 memcpy(recv_message + offset,
223 GRPC_SLICE_START_PTR(slice_buffer->slices[i]),
224 GRPC_SLICE_LENGTH(slice_buffer->slices[i]));
225 offset += GRPC_SLICE_LENGTH(slice_buffer->slices[i]);
226 }
227 }
228 // Deserialize message.
229 upb::Arena arena;
230 grpc_health_v1_HealthCheckResponse* response_struct =
231 grpc_health_v1_HealthCheckResponse_parse(
232 reinterpret_cast<char*>(recv_message), slice_buffer->length,
233 arena.ptr());
234 if (response_struct == nullptr) {
235 // Can't parse message; assume unhealthy.
236 *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
237 "cannot parse health check response");
238 return false;
239 }
240 int32_t status = grpc_health_v1_HealthCheckResponse_status(response_struct);
241 return status == grpc_health_v1_HealthCheckResponse_SERVING;
242 }
243
244 } // namespace
245
246 //
247 // HealthCheckClient::CallState
248 //
249
CallState(RefCountedPtr<HealthCheckClient> health_check_client,grpc_pollset_set * interested_parties)250 HealthCheckClient::CallState::CallState(
251 RefCountedPtr<HealthCheckClient> health_check_client,
252 grpc_pollset_set* interested_parties)
253 : health_check_client_(std::move(health_check_client)),
254 pollent_(grpc_polling_entity_create_from_pollset_set(interested_parties)),
255 arena_(Arena::Create(health_check_client_->connected_subchannel_
256 ->GetInitialCallSizeEstimate())),
257 payload_(context_) {}
258
~CallState()259 HealthCheckClient::CallState::~CallState() {
260 if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
261 gpr_log(GPR_INFO, "HealthCheckClient %p: destroying CallState %p",
262 health_check_client_.get(), this);
263 }
264 for (size_t i = 0; i < GRPC_CONTEXT_COUNT; i++) {
265 if (context_[i].destroy != nullptr) {
266 context_[i].destroy(context_[i].value);
267 }
268 }
269 // Unset the call combiner cancellation closure. This has the
270 // effect of scheduling the previously set cancellation closure, if
271 // any, so that it can release any internal references it may be
272 // holding to the call stack. Also flush the closures on exec_ctx so that
273 // filters that schedule cancel notification closures on exec_ctx do not
274 // need to take a ref of the call stack to guarantee closure liveness.
275 call_combiner_.SetNotifyOnCancel(nullptr);
276 ExecCtx::Get()->Flush();
277 arena_->Destroy();
278 }
279
Orphan()280 void HealthCheckClient::CallState::Orphan() {
281 call_combiner_.Cancel(GRPC_ERROR_CANCELLED);
282 Cancel();
283 }
284
StartCall()285 void HealthCheckClient::CallState::StartCall() {
286 SubchannelCall::Args args = {
287 health_check_client_->connected_subchannel_,
288 &pollent_,
289 GRPC_MDSTR_SLASH_GRPC_DOT_HEALTH_DOT_V1_DOT_HEALTH_SLASH_WATCH,
290 gpr_get_cycle_counter(), // start_time
291 GRPC_MILLIS_INF_FUTURE, // deadline
292 arena_,
293 context_,
294 &call_combiner_,
295 };
296 grpc_error* error = GRPC_ERROR_NONE;
297 call_ = SubchannelCall::Create(std::move(args), &error).release();
298 // Register after-destruction callback.
299 GRPC_CLOSURE_INIT(&after_call_stack_destruction_, AfterCallStackDestruction,
300 this, grpc_schedule_on_exec_ctx);
301 call_->SetAfterCallStackDestroy(&after_call_stack_destruction_);
302 // Check if creation failed.
303 if (error != GRPC_ERROR_NONE) {
304 gpr_log(GPR_ERROR,
305 "HealthCheckClient %p CallState %p: error creating health "
306 "checking call on subchannel (%s); will retry",
307 health_check_client_.get(), this, grpc_error_string(error));
308 GRPC_ERROR_UNREF(error);
309 CallEndedLocked(/*retry=*/true);
310 return;
311 }
312 // Initialize payload and batch.
313 payload_.context = context_;
314 batch_.payload = &payload_;
315 // on_complete callback takes ref, handled manually.
316 call_->Ref(DEBUG_LOCATION, "on_complete").release();
317 batch_.on_complete = GRPC_CLOSURE_INIT(&on_complete_, OnComplete, this,
318 grpc_schedule_on_exec_ctx);
319 // Add send_initial_metadata op.
320 grpc_metadata_batch_init(&send_initial_metadata_);
321 error = grpc_metadata_batch_add_head(
322 &send_initial_metadata_, &path_metadata_storage_,
323 grpc_mdelem_from_slices(
324 GRPC_MDSTR_PATH,
325 GRPC_MDSTR_SLASH_GRPC_DOT_HEALTH_DOT_V1_DOT_HEALTH_SLASH_WATCH),
326 GRPC_BATCH_PATH);
327 GPR_ASSERT(error == GRPC_ERROR_NONE);
328 payload_.send_initial_metadata.send_initial_metadata =
329 &send_initial_metadata_;
330 payload_.send_initial_metadata.send_initial_metadata_flags = 0;
331 payload_.send_initial_metadata.peer_string = nullptr;
332 batch_.send_initial_metadata = true;
333 // Add send_message op.
334 EncodeRequest(health_check_client_->service_name_, &send_message_);
335 payload_.send_message.send_message.reset(send_message_.get());
336 batch_.send_message = true;
337 // Add send_trailing_metadata op.
338 grpc_metadata_batch_init(&send_trailing_metadata_);
339 payload_.send_trailing_metadata.send_trailing_metadata =
340 &send_trailing_metadata_;
341 batch_.send_trailing_metadata = true;
342 // Add recv_initial_metadata op.
343 grpc_metadata_batch_init(&recv_initial_metadata_);
344 payload_.recv_initial_metadata.recv_initial_metadata =
345 &recv_initial_metadata_;
346 payload_.recv_initial_metadata.recv_flags = nullptr;
347 payload_.recv_initial_metadata.trailing_metadata_available = nullptr;
348 payload_.recv_initial_metadata.peer_string = nullptr;
349 // recv_initial_metadata_ready callback takes ref, handled manually.
350 call_->Ref(DEBUG_LOCATION, "recv_initial_metadata_ready").release();
351 payload_.recv_initial_metadata.recv_initial_metadata_ready =
352 GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_, RecvInitialMetadataReady,
353 this, grpc_schedule_on_exec_ctx);
354 batch_.recv_initial_metadata = true;
355 // Add recv_message op.
356 payload_.recv_message.recv_message = &recv_message_;
357 // recv_message callback takes ref, handled manually.
358 call_->Ref(DEBUG_LOCATION, "recv_message_ready").release();
359 payload_.recv_message.recv_message_ready = GRPC_CLOSURE_INIT(
360 &recv_message_ready_, RecvMessageReady, this, grpc_schedule_on_exec_ctx);
361 batch_.recv_message = true;
362 // Start batch.
363 StartBatch(&batch_);
364 // Initialize recv_trailing_metadata batch.
365 recv_trailing_metadata_batch_.payload = &payload_;
366 // Add recv_trailing_metadata op.
367 grpc_metadata_batch_init(&recv_trailing_metadata_);
368 payload_.recv_trailing_metadata.recv_trailing_metadata =
369 &recv_trailing_metadata_;
370 payload_.recv_trailing_metadata.collect_stats = &collect_stats_;
371 // This callback signals the end of the call, so it relies on the
372 // initial ref instead of taking a new ref. When it's invoked, the
373 // initial ref is released.
374 payload_.recv_trailing_metadata.recv_trailing_metadata_ready =
375 GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_,
376 RecvTrailingMetadataReady, this,
377 grpc_schedule_on_exec_ctx);
378 recv_trailing_metadata_batch_.recv_trailing_metadata = true;
379 // Start recv_trailing_metadata batch.
380 StartBatch(&recv_trailing_metadata_batch_);
381 }
382
StartBatchInCallCombiner(void * arg,grpc_error *)383 void HealthCheckClient::CallState::StartBatchInCallCombiner(
384 void* arg, grpc_error* /*error*/) {
385 grpc_transport_stream_op_batch* batch =
386 static_cast<grpc_transport_stream_op_batch*>(arg);
387 SubchannelCall* call =
388 static_cast<SubchannelCall*>(batch->handler_private.extra_arg);
389 call->StartTransportStreamOpBatch(batch);
390 }
391
StartBatch(grpc_transport_stream_op_batch * batch)392 void HealthCheckClient::CallState::StartBatch(
393 grpc_transport_stream_op_batch* batch) {
394 batch->handler_private.extra_arg = call_;
395 GRPC_CLOSURE_INIT(&batch->handler_private.closure, StartBatchInCallCombiner,
396 batch, grpc_schedule_on_exec_ctx);
397 GRPC_CALL_COMBINER_START(&call_combiner_, &batch->handler_private.closure,
398 GRPC_ERROR_NONE, "start_subchannel_batch");
399 }
400
AfterCallStackDestruction(void * arg,grpc_error *)401 void HealthCheckClient::CallState::AfterCallStackDestruction(
402 void* arg, grpc_error* /*error*/) {
403 HealthCheckClient::CallState* self =
404 static_cast<HealthCheckClient::CallState*>(arg);
405 delete self;
406 }
407
OnCancelComplete(void * arg,grpc_error *)408 void HealthCheckClient::CallState::OnCancelComplete(void* arg,
409 grpc_error* /*error*/) {
410 HealthCheckClient::CallState* self =
411 static_cast<HealthCheckClient::CallState*>(arg);
412 GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "health_cancel");
413 self->call_->Unref(DEBUG_LOCATION, "cancel");
414 }
415
StartCancel(void * arg,grpc_error *)416 void HealthCheckClient::CallState::StartCancel(void* arg,
417 grpc_error* /*error*/) {
418 HealthCheckClient::CallState* self =
419 static_cast<HealthCheckClient::CallState*>(arg);
420 auto* batch = grpc_make_transport_stream_op(
421 GRPC_CLOSURE_CREATE(OnCancelComplete, self, grpc_schedule_on_exec_ctx));
422 batch->cancel_stream = true;
423 batch->payload->cancel_stream.cancel_error = GRPC_ERROR_CANCELLED;
424 self->call_->StartTransportStreamOpBatch(batch);
425 }
426
Cancel()427 void HealthCheckClient::CallState::Cancel() {
428 bool expected = false;
429 if (cancelled_.CompareExchangeStrong(&expected, true, MemoryOrder::ACQ_REL,
430 MemoryOrder::ACQUIRE)) {
431 call_->Ref(DEBUG_LOCATION, "cancel").release();
432 GRPC_CALL_COMBINER_START(
433 &call_combiner_,
434 GRPC_CLOSURE_CREATE(StartCancel, this, grpc_schedule_on_exec_ctx),
435 GRPC_ERROR_NONE, "health_cancel");
436 }
437 }
438
OnComplete(void * arg,grpc_error *)439 void HealthCheckClient::CallState::OnComplete(void* arg,
440 grpc_error* /*error*/) {
441 HealthCheckClient::CallState* self =
442 static_cast<HealthCheckClient::CallState*>(arg);
443 GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "on_complete");
444 grpc_metadata_batch_destroy(&self->send_initial_metadata_);
445 grpc_metadata_batch_destroy(&self->send_trailing_metadata_);
446 self->call_->Unref(DEBUG_LOCATION, "on_complete");
447 }
448
RecvInitialMetadataReady(void * arg,grpc_error *)449 void HealthCheckClient::CallState::RecvInitialMetadataReady(
450 void* arg, grpc_error* /*error*/) {
451 HealthCheckClient::CallState* self =
452 static_cast<HealthCheckClient::CallState*>(arg);
453 GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "recv_initial_metadata_ready");
454 grpc_metadata_batch_destroy(&self->recv_initial_metadata_);
455 self->call_->Unref(DEBUG_LOCATION, "recv_initial_metadata_ready");
456 }
457
DoneReadingRecvMessage(grpc_error * error)458 void HealthCheckClient::CallState::DoneReadingRecvMessage(grpc_error* error) {
459 recv_message_.reset();
460 if (error != GRPC_ERROR_NONE) {
461 GRPC_ERROR_UNREF(error);
462 Cancel();
463 grpc_slice_buffer_destroy_internal(&recv_message_buffer_);
464 call_->Unref(DEBUG_LOCATION, "recv_message_ready");
465 return;
466 }
467 const bool healthy = DecodeResponse(&recv_message_buffer_, &error);
468 const grpc_connectivity_state state =
469 healthy ? GRPC_CHANNEL_READY : GRPC_CHANNEL_TRANSIENT_FAILURE;
470 const char* reason = error == GRPC_ERROR_NONE && !healthy
471 ? "backend unhealthy"
472 : grpc_error_string(error);
473 health_check_client_->SetHealthStatus(state, reason);
474 seen_response_.Store(true, MemoryOrder::RELEASE);
475 grpc_slice_buffer_destroy_internal(&recv_message_buffer_);
476 // Start another recv_message batch.
477 // This re-uses the ref we're holding.
478 // Note: Can't just reuse batch_ here, since we don't know that all
479 // callbacks from the original batch have completed yet.
480 recv_message_batch_.payload = &payload_;
481 payload_.recv_message.recv_message = &recv_message_;
482 payload_.recv_message.recv_message_ready = GRPC_CLOSURE_INIT(
483 &recv_message_ready_, RecvMessageReady, this, grpc_schedule_on_exec_ctx);
484 recv_message_batch_.recv_message = true;
485 StartBatch(&recv_message_batch_);
486 }
487
PullSliceFromRecvMessage()488 grpc_error* HealthCheckClient::CallState::PullSliceFromRecvMessage() {
489 grpc_slice slice;
490 grpc_error* error = recv_message_->Pull(&slice);
491 if (error == GRPC_ERROR_NONE) {
492 grpc_slice_buffer_add(&recv_message_buffer_, slice);
493 }
494 return error;
495 }
496
ContinueReadingRecvMessage()497 void HealthCheckClient::CallState::ContinueReadingRecvMessage() {
498 while (recv_message_->Next(SIZE_MAX, &recv_message_ready_)) {
499 grpc_error* error = PullSliceFromRecvMessage();
500 if (error != GRPC_ERROR_NONE) {
501 DoneReadingRecvMessage(error);
502 return;
503 }
504 if (recv_message_buffer_.length == recv_message_->length()) {
505 DoneReadingRecvMessage(GRPC_ERROR_NONE);
506 break;
507 }
508 }
509 }
510
OnByteStreamNext(void * arg,grpc_error * error)511 void HealthCheckClient::CallState::OnByteStreamNext(void* arg,
512 grpc_error* error) {
513 HealthCheckClient::CallState* self =
514 static_cast<HealthCheckClient::CallState*>(arg);
515 if (error != GRPC_ERROR_NONE) {
516 self->DoneReadingRecvMessage(GRPC_ERROR_REF(error));
517 return;
518 }
519 error = self->PullSliceFromRecvMessage();
520 if (error != GRPC_ERROR_NONE) {
521 self->DoneReadingRecvMessage(error);
522 return;
523 }
524 if (self->recv_message_buffer_.length == self->recv_message_->length()) {
525 self->DoneReadingRecvMessage(GRPC_ERROR_NONE);
526 } else {
527 self->ContinueReadingRecvMessage();
528 }
529 }
530
RecvMessageReady(void * arg,grpc_error *)531 void HealthCheckClient::CallState::RecvMessageReady(void* arg,
532 grpc_error* /*error*/) {
533 HealthCheckClient::CallState* self =
534 static_cast<HealthCheckClient::CallState*>(arg);
535 GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "recv_message_ready");
536 if (self->recv_message_ == nullptr) {
537 self->call_->Unref(DEBUG_LOCATION, "recv_message_ready");
538 return;
539 }
540 grpc_slice_buffer_init(&self->recv_message_buffer_);
541 GRPC_CLOSURE_INIT(&self->recv_message_ready_, OnByteStreamNext, self,
542 grpc_schedule_on_exec_ctx);
543 self->ContinueReadingRecvMessage();
544 // Ref will continue to be held until we finish draining the byte stream.
545 }
546
RecvTrailingMetadataReady(void * arg,grpc_error * error)547 void HealthCheckClient::CallState::RecvTrailingMetadataReady(
548 void* arg, grpc_error* error) {
549 HealthCheckClient::CallState* self =
550 static_cast<HealthCheckClient::CallState*>(arg);
551 GRPC_CALL_COMBINER_STOP(&self->call_combiner_,
552 "recv_trailing_metadata_ready");
553 // Get call status.
554 grpc_status_code status = GRPC_STATUS_UNKNOWN;
555 if (error != GRPC_ERROR_NONE) {
556 grpc_error_get_status(error, GRPC_MILLIS_INF_FUTURE, &status,
557 nullptr /* slice */, nullptr /* http_error */,
558 nullptr /* error_string */);
559 } else if (self->recv_trailing_metadata_.idx.named.grpc_status != nullptr) {
560 status = grpc_get_status_code_from_metadata(
561 self->recv_trailing_metadata_.idx.named.grpc_status->md);
562 }
563 if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
564 gpr_log(GPR_INFO,
565 "HealthCheckClient %p CallState %p: health watch failed with "
566 "status %d",
567 self->health_check_client_.get(), self, status);
568 }
569 // Clean up.
570 grpc_metadata_batch_destroy(&self->recv_trailing_metadata_);
571 // For status UNIMPLEMENTED, give up and assume always healthy.
572 bool retry = true;
573 if (status == GRPC_STATUS_UNIMPLEMENTED) {
574 static const char kErrorMessage[] =
575 "health checking Watch method returned UNIMPLEMENTED; "
576 "disabling health checks but assuming server is healthy";
577 gpr_log(GPR_ERROR, kErrorMessage);
578 if (self->health_check_client_->channelz_node_ != nullptr) {
579 self->health_check_client_->channelz_node_->AddTraceEvent(
580 channelz::ChannelTrace::Error,
581 grpc_slice_from_static_string(kErrorMessage));
582 }
583 self->health_check_client_->SetHealthStatus(GRPC_CHANNEL_READY,
584 kErrorMessage);
585 retry = false;
586 }
587 MutexLock lock(&self->health_check_client_->mu_);
588 self->CallEndedLocked(retry);
589 }
590
CallEndedLocked(bool retry)591 void HealthCheckClient::CallState::CallEndedLocked(bool retry) {
592 // If this CallState is still in use, this call ended because of a failure,
593 // so we need to stop using it and optionally create a new one.
594 // Otherwise, we have deliberately ended this call, and no further action
595 // is required.
596 if (this == health_check_client_->call_state_.get()) {
597 health_check_client_->call_state_.reset();
598 if (retry) {
599 GPR_ASSERT(!health_check_client_->shutting_down_);
600 if (seen_response_.Load(MemoryOrder::ACQUIRE)) {
601 // If the call fails after we've gotten a successful response, reset
602 // the backoff and restart the call immediately.
603 health_check_client_->retry_backoff_.Reset();
604 health_check_client_->StartCallLocked();
605 } else {
606 // If the call failed without receiving any messages, retry later.
607 health_check_client_->StartRetryTimerLocked();
608 }
609 }
610 }
611 // When the last ref to the call stack goes away, the CallState object
612 // will be automatically destroyed.
613 call_->Unref(DEBUG_LOCATION, "call_ended");
614 }
615
616 } // namespace grpc_core
617