1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_HEALTH_HEALTH_CHECK_CLIENT_H
20 #define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_HEALTH_HEALTH_CHECK_CLIENT_H
21 
22 #include <grpc/support/port_platform.h>
23 
24 #include <grpc/grpc.h>
25 #include <grpc/support/sync.h>
26 
27 #include "src/core/ext/filters/client_channel/client_channel_channelz.h"
28 #include "src/core/ext/filters/client_channel/subchannel.h"
29 #include "src/core/lib/backoff/backoff.h"
30 #include "src/core/lib/gprpp/arena.h"
31 #include "src/core/lib/gprpp/atomic.h"
32 #include "src/core/lib/gprpp/orphanable.h"
33 #include "src/core/lib/gprpp/ref_counted_ptr.h"
34 #include "src/core/lib/gprpp/sync.h"
35 #include "src/core/lib/iomgr/call_combiner.h"
36 #include "src/core/lib/iomgr/closure.h"
37 #include "src/core/lib/iomgr/polling_entity.h"
38 #include "src/core/lib/iomgr/timer.h"
39 #include "src/core/lib/transport/byte_stream.h"
40 #include "src/core/lib/transport/metadata_batch.h"
41 #include "src/core/lib/transport/transport.h"
42 
43 namespace grpc_core {
44 
45 class HealthCheckClient : public InternallyRefCounted<HealthCheckClient> {
46  public:
47   HealthCheckClient(std::string service_name,
48                     RefCountedPtr<ConnectedSubchannel> connected_subchannel,
49                     grpc_pollset_set* interested_parties,
50                     RefCountedPtr<channelz::SubchannelNode> channelz_node,
51                     RefCountedPtr<ConnectivityStateWatcherInterface> watcher);
52 
53   ~HealthCheckClient() override;
54 
55   void Orphan() override;
56 
57  private:
58   // Contains a call to the backend and all the data related to the call.
59   class CallState : public Orphanable {
60    public:
61     CallState(RefCountedPtr<HealthCheckClient> health_check_client,
62               grpc_pollset_set* interested_parties_);
63     ~CallState() override;
64 
65     void Orphan() override;
66 
67     void StartCall();
68 
69    private:
70     void Cancel();
71 
72     void StartBatch(grpc_transport_stream_op_batch* batch);
73     static void StartBatchInCallCombiner(void* arg, grpc_error* error);
74 
75     // Requires holding health_check_client_->mu_.
76     void CallEndedLocked(bool retry);
77 
78     static void OnComplete(void* arg, grpc_error* error);
79     static void RecvInitialMetadataReady(void* arg, grpc_error* error);
80     static void RecvMessageReady(void* arg, grpc_error* error);
81     static void RecvTrailingMetadataReady(void* arg, grpc_error* error);
82     static void StartCancel(void* arg, grpc_error* error);
83     static void OnCancelComplete(void* arg, grpc_error* error);
84 
85     static void OnByteStreamNext(void* arg, grpc_error* error);
86     void ContinueReadingRecvMessage();
87     grpc_error* PullSliceFromRecvMessage();
88     void DoneReadingRecvMessage(grpc_error* error);
89 
90     static void AfterCallStackDestruction(void* arg, grpc_error* error);
91 
92     RefCountedPtr<HealthCheckClient> health_check_client_;
93     grpc_polling_entity pollent_;
94 
95     Arena* arena_;
96     grpc_core::CallCombiner call_combiner_;
97     grpc_call_context_element context_[GRPC_CONTEXT_COUNT] = {};
98 
99     // The streaming call to the backend. Always non-null.
100     // Refs are tracked manually; when the last ref is released, the
101     // CallState object will be automatically destroyed.
102     SubchannelCall* call_;
103 
104     grpc_transport_stream_op_batch_payload payload_;
105     grpc_transport_stream_op_batch batch_;
106     grpc_transport_stream_op_batch recv_message_batch_;
107     grpc_transport_stream_op_batch recv_trailing_metadata_batch_;
108 
109     grpc_closure on_complete_;
110 
111     // send_initial_metadata
112     grpc_metadata_batch send_initial_metadata_;
113     grpc_linked_mdelem path_metadata_storage_;
114 
115     // send_message
116     ManualConstructor<SliceBufferByteStream> send_message_;
117 
118     // send_trailing_metadata
119     grpc_metadata_batch send_trailing_metadata_;
120 
121     // recv_initial_metadata
122     grpc_metadata_batch recv_initial_metadata_;
123     grpc_closure recv_initial_metadata_ready_;
124 
125     // recv_message
126     OrphanablePtr<ByteStream> recv_message_;
127     grpc_closure recv_message_ready_;
128     grpc_slice_buffer recv_message_buffer_;
129     Atomic<bool> seen_response_{false};
130 
131     // recv_trailing_metadata
132     grpc_metadata_batch recv_trailing_metadata_;
133     grpc_transport_stream_stats collect_stats_;
134     grpc_closure recv_trailing_metadata_ready_;
135 
136     // True if the cancel_stream batch has been started.
137     Atomic<bool> cancelled_{false};
138 
139     // Closure for call stack destruction.
140     grpc_closure after_call_stack_destruction_;
141   };
142 
143   void StartCall();
144   void StartCallLocked();  // Requires holding mu_.
145 
146   void StartRetryTimerLocked();  // Requires holding mu_.
147   static void OnRetryTimer(void* arg, grpc_error* error);
148 
149   void SetHealthStatus(grpc_connectivity_state state, const char* reason);
150   void SetHealthStatusLocked(grpc_connectivity_state state,
151                              const char* reason);  // Requires holding mu_.
152 
153   std::string service_name_;
154   RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
155   grpc_pollset_set* interested_parties_;  // Do not own.
156   RefCountedPtr<channelz::SubchannelNode> channelz_node_;
157 
158   Mutex mu_;
159   RefCountedPtr<ConnectivityStateWatcherInterface> watcher_;
160   bool shutting_down_ = false;
161 
162   // The data associated with the current health check call.  It holds a ref
163   // to this HealthCheckClient object.
164   OrphanablePtr<CallState> call_state_;
165 
166   // Call retry state.
167   BackOff retry_backoff_;
168   grpc_timer retry_timer_;
169   grpc_closure retry_timer_callback_;
170   bool retry_timer_callback_pending_ = false;
171 };
172 
173 }  // namespace grpc_core
174 
175 #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_HEALTH_HEALTH_CHECK_CLIENT_H */
176