1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_H
20 #define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_H
21 
22 #include <grpc/support/port_platform.h>
23 
24 #include <deque>
25 
26 #include "src/core/ext/filters/client_channel/client_channel_channelz.h"
27 #include "src/core/ext/filters/client_channel/connector.h"
28 #include "src/core/ext/filters/client_channel/subchannel_pool_interface.h"
29 #include "src/core/lib/backoff/backoff.h"
30 #include "src/core/lib/channel/channel_stack.h"
31 #include "src/core/lib/gpr/time_precise.h"
32 #include "src/core/lib/gprpp/arena.h"
33 #include "src/core/lib/gprpp/ref_counted.h"
34 #include "src/core/lib/gprpp/ref_counted_ptr.h"
35 #include "src/core/lib/gprpp/sync.h"
36 #include "src/core/lib/iomgr/polling_entity.h"
37 #include "src/core/lib/iomgr/timer.h"
38 #include "src/core/lib/transport/connectivity_state.h"
39 #include "src/core/lib/transport/metadata.h"
40 
41 // Channel arg containing a URI indicating the address to connect to.
42 #define GRPC_ARG_SUBCHANNEL_ADDRESS "grpc.subchannel_address"
43 
44 // For debugging refcounting.
45 #ifndef NDEBUG
46 #define GRPC_SUBCHANNEL_REF(p, r) (p)->Ref(__FILE__, __LINE__, (r))
47 #define GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(p, r) (p)->RefFromWeakRef()
48 #define GRPC_SUBCHANNEL_UNREF(p, r) (p)->Unref(__FILE__, __LINE__, (r))
49 #define GRPC_SUBCHANNEL_WEAK_REF(p, r) (p)->WeakRef(__FILE__, __LINE__, (r))
50 #define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) (p)->WeakUnref(__FILE__, __LINE__, (r))
51 #define GRPC_SUBCHANNEL_REF_EXTRA_ARGS \
52   const char *file, int line, const char *reason
53 #define GRPC_SUBCHANNEL_REF_REASON reason
54 #define GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS \
55   , GRPC_SUBCHANNEL_REF_EXTRA_ARGS, const char* purpose
56 #define GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE(x) , file, line, reason, x
57 #else
58 #define GRPC_SUBCHANNEL_REF(p, r) (p)->Ref()
59 #define GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(p, r) (p)->RefFromWeakRef()
60 #define GRPC_SUBCHANNEL_UNREF(p, r) (p)->Unref()
61 #define GRPC_SUBCHANNEL_WEAK_REF(p, r) (p)->WeakRef()
62 #define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) (p)->WeakUnref()
63 #define GRPC_SUBCHANNEL_REF_EXTRA_ARGS
64 #define GRPC_SUBCHANNEL_REF_REASON ""
65 #define GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS
66 #define GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE(x)
67 #endif
68 
69 namespace grpc_core {
70 
71 class SubchannelCall;
72 
73 class ConnectedSubchannel : public RefCounted<ConnectedSubchannel> {
74  public:
75   ConnectedSubchannel(
76       grpc_channel_stack* channel_stack, const grpc_channel_args* args,
77       RefCountedPtr<channelz::SubchannelNode> channelz_subchannel);
78   ~ConnectedSubchannel() override;
79 
80   void StartWatch(grpc_pollset_set* interested_parties,
81                   OrphanablePtr<ConnectivityStateWatcherInterface> watcher);
82 
83   void Ping(grpc_closure* on_initiate, grpc_closure* on_ack);
84 
channel_stack()85   grpc_channel_stack* channel_stack() const { return channel_stack_; }
args()86   const grpc_channel_args* args() const { return args_; }
channelz_subchannel()87   channelz::SubchannelNode* channelz_subchannel() const {
88     return channelz_subchannel_.get();
89   }
90 
91   size_t GetInitialCallSizeEstimate() const;
92 
93  private:
94   grpc_channel_stack* channel_stack_;
95   grpc_channel_args* args_;
96   // ref counted pointer to the channelz node in this connected subchannel's
97   // owning subchannel.
98   RefCountedPtr<channelz::SubchannelNode> channelz_subchannel_;
99 };
100 
101 // Implements the interface of RefCounted<>.
102 class SubchannelCall {
103  public:
104   struct Args {
105     RefCountedPtr<ConnectedSubchannel> connected_subchannel;
106     grpc_polling_entity* pollent;
107     grpc_slice path;
108     gpr_cycle_counter start_time;
109     grpc_millis deadline;
110     Arena* arena;
111     grpc_call_context_element* context;
112     CallCombiner* call_combiner;
113   };
114   static RefCountedPtr<SubchannelCall> Create(Args args, grpc_error** error);
115 
116   // Continues processing a transport stream op batch.
117   void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch);
118 
119   // Returns the call stack of the subchannel call.
120   grpc_call_stack* GetCallStack();
121 
122   // Sets the 'then_schedule_closure' argument for call stack destruction.
123   // Must be called once per call.
124   void SetAfterCallStackDestroy(grpc_closure* closure);
125 
126   // Interface of RefCounted<>.
127   RefCountedPtr<SubchannelCall> Ref() GRPC_MUST_USE_RESULT;
128   RefCountedPtr<SubchannelCall> Ref(const DebugLocation& location,
129                                     const char* reason) GRPC_MUST_USE_RESULT;
130   // When refcount drops to 0, destroys itself and the associated call stack,
131   // but does NOT free the memory because it's in the call arena.
132   void Unref();
133   void Unref(const DebugLocation& location, const char* reason);
134 
135  private:
136   // Allow RefCountedPtr<> to access IncrementRefCount().
137   template <typename T>
138   friend class RefCountedPtr;
139 
140   SubchannelCall(Args args, grpc_error** error);
141 
142   // If channelz is enabled, intercepts recv_trailing so that we may check the
143   // status and associate it to a subchannel.
144   void MaybeInterceptRecvTrailingMetadata(
145       grpc_transport_stream_op_batch* batch);
146 
147   static void RecvTrailingMetadataReady(void* arg, grpc_error* error);
148 
149   // Interface of RefCounted<>.
150   void IncrementRefCount();
151   void IncrementRefCount(const DebugLocation& location, const char* reason);
152 
153   static void Destroy(void* arg, grpc_error* error);
154 
155   RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
156   grpc_closure* after_call_stack_destroy_ = nullptr;
157   // State needed to support channelz interception of recv trailing metadata.
158   grpc_closure recv_trailing_metadata_ready_;
159   grpc_closure* original_recv_trailing_metadata_ = nullptr;
160   grpc_metadata_batch* recv_trailing_metadata_ = nullptr;
161   grpc_millis deadline_;
162 };
163 
164 // A subchannel that knows how to connect to exactly one target address. It
165 // provides a target for load balancing.
166 //
167 // Note that this is the "real" subchannel implementation, whose API is
168 // different from the SubchannelInterface that is exposed to LB policy
169 // implementations.  The client channel provides an adaptor class
170 // (SubchannelWrapper) that "converts" between the two.
171 class Subchannel {
172  public:
173   class ConnectivityStateWatcherInterface
174       : public RefCounted<ConnectivityStateWatcherInterface> {
175    public:
176     struct ConnectivityStateChange {
177       grpc_connectivity_state state;
178       absl::Status status;
179       RefCountedPtr<ConnectedSubchannel> connected_subchannel;
180     };
181 
182     ~ConnectivityStateWatcherInterface() override = default;
183 
184     // Will be invoked whenever the subchannel's connectivity state
185     // changes.  There will be only one invocation of this method on a
186     // given watcher instance at any given time.
187     // Implementations should call PopConnectivityStateChange to get the next
188     // connectivity state change.
189     virtual void OnConnectivityStateChange() = 0;
190 
191     virtual grpc_pollset_set* interested_parties() = 0;
192 
193     // Enqueues connectivity state change notifications.
194     // When the state changes to READY, connected_subchannel will
195     // contain a ref to the connected subchannel.  When it changes from
196     // READY to some other state, the implementation must release its
197     // ref to the connected subchannel.
198     // TODO(yashkt): This is currently needed to send the state updates in the
199     // right order when asynchronously notifying. This will no longer be
200     // necessary when we have access to EventManager.
201     void PushConnectivityStateChange(ConnectivityStateChange state_change);
202 
203     // Dequeues connectivity state change notifications.
204     ConnectivityStateChange PopConnectivityStateChange();
205 
206    private:
207     // Keeps track of the updates that the watcher instance must be notified of.
208     // TODO(yashkt): This is currently needed to send the state updates in the
209     // right order when asynchronously notifying. This will no longer be
210     // necessary when we have access to EventManager.
211     std::deque<ConnectivityStateChange> connectivity_state_queue_;
212     Mutex mu_;  // protects the queue
213   };
214 
215   // The ctor and dtor are not intended to use directly.
216   Subchannel(SubchannelKey* key, OrphanablePtr<SubchannelConnector> connector,
217              const grpc_channel_args* args);
218   ~Subchannel();
219 
220   // Creates a subchannel given \a connector and \a args.
221   static Subchannel* Create(OrphanablePtr<SubchannelConnector> connector,
222                             const grpc_channel_args* args);
223 
224   // Throttles keepalive time to \a new_keepalive_time iff \a new_keepalive_time
225   // is larger than the subchannel's current keepalive time. The updated value
226   // will have an affect when the subchannel creates a new ConnectedSubchannel.
227   void ThrottleKeepaliveTime(int new_keepalive_time);
228 
229   // Strong and weak refcounting.
230   Subchannel* Ref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
231   void Unref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
232   Subchannel* WeakRef(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
233   void WeakUnref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
234   // Attempts to return a strong ref when only the weak refcount is guaranteed
235   // non-zero. If the strong refcount is zero, does not alter the refcount and
236   // returns null.
237   Subchannel* RefFromWeakRef();
238 
239   // Gets the string representing the subchannel address.
240   // Caller doesn't take ownership.
241   const char* GetTargetAddress();
242 
channel_args()243   const grpc_channel_args* channel_args() const { return args_; }
244 
245   channelz::SubchannelNode* channelz_node();
246 
247   // Returns the current connectivity state of the subchannel.
248   // If health_check_service_name is non-null, the returned connectivity
249   // state will be based on the state reported by the backend for that
250   // service name.
251   // If the return value is GRPC_CHANNEL_READY, also sets *connected_subchannel.
252   grpc_connectivity_state CheckConnectivityState(
253       const absl::optional<std::string>& health_check_service_name,
254       RefCountedPtr<ConnectedSubchannel>* connected_subchannel);
255 
256   // Starts watching the subchannel's connectivity state.
257   // The first callback to the watcher will be delivered when the
258   // subchannel's connectivity state becomes a value other than
259   // initial_state, which may happen immediately.
260   // Subsequent callbacks will be delivered as the subchannel's state
261   // changes.
262   // The watcher will be destroyed either when the subchannel is
263   // destroyed or when CancelConnectivityStateWatch() is called.
264   void WatchConnectivityState(
265       grpc_connectivity_state initial_state,
266       const absl::optional<std::string>& health_check_service_name,
267       RefCountedPtr<ConnectivityStateWatcherInterface> watcher);
268 
269   // Cancels a connectivity state watch.
270   // If the watcher has already been destroyed, this is a no-op.
271   void CancelConnectivityStateWatch(
272       const absl::optional<std::string>& health_check_service_name,
273       ConnectivityStateWatcherInterface* watcher);
274 
275   // Attempt to connect to the backend.  Has no effect if already connected.
276   void AttemptToConnect();
277 
278   // Resets the connection backoff of the subchannel.
279   // TODO(roth): Move connection backoff out of subchannels and up into LB
280   // policy code (probably by adding a SubchannelGroup between
281   // SubchannelList and SubchannelData), at which point this method can
282   // go away.
283   void ResetBackoff();
284 
285   // Returns a new channel arg encoding the subchannel address as a URI
286   // string. Caller is responsible for freeing the string.
287   static grpc_arg CreateSubchannelAddressArg(const grpc_resolved_address* addr);
288 
289   // Returns the URI string from the subchannel address arg in \a args.
290   static const char* GetUriFromSubchannelAddressArg(
291       const grpc_channel_args* args);
292 
293   // Sets \a addr from the subchannel address arg in \a args.
294   static void GetAddressFromSubchannelAddressArg(const grpc_channel_args* args,
295                                                  grpc_resolved_address* addr);
296 
297  private:
298   // A linked list of ConnectivityStateWatcherInterfaces that are monitoring
299   // the subchannel's state.
300   class ConnectivityStateWatcherList {
301    public:
~ConnectivityStateWatcherList()302     ~ConnectivityStateWatcherList() { Clear(); }
303 
304     void AddWatcherLocked(
305         RefCountedPtr<ConnectivityStateWatcherInterface> watcher);
306     void RemoveWatcherLocked(ConnectivityStateWatcherInterface* watcher);
307 
308     // Notifies all watchers in the list about a change to state.
309     void NotifyLocked(Subchannel* subchannel, grpc_connectivity_state state,
310                       const absl::Status& status);
311 
Clear()312     void Clear() { watchers_.clear(); }
313 
empty()314     bool empty() const { return watchers_.empty(); }
315 
316    private:
317     // TODO(roth): Once we can use C++-14 heterogeneous lookups, this can
318     // be a set instead of a map.
319     std::map<ConnectivityStateWatcherInterface*,
320              RefCountedPtr<ConnectivityStateWatcherInterface>>
321         watchers_;
322   };
323 
324   // A map that tracks ConnectivityStateWatcherInterfaces using a particular
325   // health check service name.
326   //
327   // There is one entry in the map for each health check service name.
328   // Entries exist only as long as there are watchers using the
329   // corresponding service name.
330   //
331   // A health check client is maintained only while the subchannel is in
332   // state READY.
333   class HealthWatcherMap {
334    public:
335     void AddWatcherLocked(
336         Subchannel* subchannel, grpc_connectivity_state initial_state,
337         const std::string& health_check_service_name,
338         RefCountedPtr<ConnectivityStateWatcherInterface> watcher);
339     void RemoveWatcherLocked(const std::string& health_check_service_name,
340                              ConnectivityStateWatcherInterface* watcher);
341 
342     // Notifies the watcher when the subchannel's state changes.
343     void NotifyLocked(grpc_connectivity_state state,
344                       const absl::Status& status);
345 
346     grpc_connectivity_state CheckConnectivityStateLocked(
347         Subchannel* subchannel, const std::string& health_check_service_name);
348 
349     void ShutdownLocked();
350 
351    private:
352     class HealthWatcher;
353 
354     std::map<std::string, OrphanablePtr<HealthWatcher>> map_;
355   };
356 
357   class ConnectedSubchannelStateWatcher;
358 
359   class AsyncWatcherNotifierLocked;
360 
361   // Sets the subchannel's connectivity state to \a state.
362   void SetConnectivityStateLocked(grpc_connectivity_state state,
363                                   const absl::Status& status);
364 
365   // Methods for connection.
366   void MaybeStartConnectingLocked();
367   static void OnRetryAlarm(void* arg, grpc_error* error);
368   void ContinueConnectingLocked();
369   static void OnConnectingFinished(void* arg, grpc_error* error);
370   bool PublishTransportLocked();
371   void Disconnect();
372 
373   gpr_atm RefMutate(gpr_atm delta,
374                     int barrier GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS);
375 
376   // The subchannel pool this subchannel is in.
377   RefCountedPtr<SubchannelPoolInterface> subchannel_pool_;
378   // TODO(juanlishen): Consider using args_ as key_ directly.
379   // Subchannel key that identifies this subchannel in the subchannel pool.
380   SubchannelKey* key_;
381   // Channel args.
382   grpc_channel_args* args_;
383   // pollset_set tracking who's interested in a connection being setup.
384   grpc_pollset_set* pollset_set_;
385   // Protects the other members.
386   Mutex mu_;
387   // Refcount
388   //    - lower INTERNAL_REF_BITS bits are for internal references:
389   //      these do not keep the subchannel open.
390   //    - upper remaining bits are for public references: these do
391   //      keep the subchannel open
392   gpr_atm ref_pair_;
393 
394   // Connection states.
395   OrphanablePtr<SubchannelConnector> connector_;
396   // Set during connection.
397   SubchannelConnector::Result connecting_result_;
398   grpc_closure on_connecting_finished_;
399   // Active connection, or null.
400   RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
401   bool connecting_ = false;
402   bool disconnected_ = false;
403 
404   // Connectivity state tracking.
405   grpc_connectivity_state state_ = GRPC_CHANNEL_IDLE;
406   absl::Status status_;
407   // The list of watchers without a health check service name.
408   ConnectivityStateWatcherList watcher_list_;
409   // The map of watchers with health check service names.
410   HealthWatcherMap health_watcher_map_;
411 
412   // Backoff state.
413   BackOff backoff_;
414   grpc_millis next_attempt_deadline_;
415   grpc_millis min_connect_timeout_ms_;
416   bool backoff_begun_ = false;
417 
418   // Retry alarm.
419   grpc_timer retry_alarm_;
420   grpc_closure on_retry_alarm_;
421   bool have_retry_alarm_ = false;
422   // reset_backoff() was called while alarm was pending.
423   bool retry_immediately_ = false;
424   // Keepalive time period (-1 for unset)
425   int keepalive_time_ = -1;
426 
427   // Channelz tracking.
428   RefCountedPtr<channelz::SubchannelNode> channelz_node_;
429 };
430 
431 }  // namespace grpc_core
432 
433 #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_H */
434