1 /*
2  * Copyright 2016 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.internal;
18 
19 import static com.google.common.base.Preconditions.checkArgument;
20 import static com.google.common.base.Preconditions.checkNotNull;
21 import static com.google.common.base.Preconditions.checkState;
22 import static io.grpc.ConnectivityState.IDLE;
23 import static io.grpc.ConnectivityState.SHUTDOWN;
24 import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
25 import static io.grpc.internal.ServiceConfigInterceptor.HEDGING_POLICY_KEY;
26 import static io.grpc.internal.ServiceConfigInterceptor.RETRY_POLICY_KEY;
27 
28 import com.google.common.annotations.VisibleForTesting;
29 import com.google.common.base.MoreObjects;
30 import com.google.common.base.Stopwatch;
31 import com.google.common.base.Supplier;
32 import com.google.common.util.concurrent.ListenableFuture;
33 import com.google.common.util.concurrent.SettableFuture;
34 import io.grpc.Attributes;
35 import io.grpc.CallOptions;
36 import io.grpc.Channel;
37 import io.grpc.ClientCall;
38 import io.grpc.ClientInterceptor;
39 import io.grpc.ClientInterceptors;
40 import io.grpc.ClientStreamTracer;
41 import io.grpc.CompressorRegistry;
42 import io.grpc.ConnectivityState;
43 import io.grpc.ConnectivityStateInfo;
44 import io.grpc.Context;
45 import io.grpc.DecompressorRegistry;
46 import io.grpc.EquivalentAddressGroup;
47 import io.grpc.InternalChannelz;
48 import io.grpc.InternalChannelz.ChannelStats;
49 import io.grpc.InternalChannelz.ChannelTrace;
50 import io.grpc.InternalInstrumented;
51 import io.grpc.InternalLogId;
52 import io.grpc.InternalWithLogId;
53 import io.grpc.LoadBalancer;
54 import io.grpc.LoadBalancer.PickResult;
55 import io.grpc.LoadBalancer.PickSubchannelArgs;
56 import io.grpc.LoadBalancer.SubchannelPicker;
57 import io.grpc.ManagedChannel;
58 import io.grpc.Metadata;
59 import io.grpc.MethodDescriptor;
60 import io.grpc.NameResolver;
61 import io.grpc.Status;
62 import io.grpc.internal.ClientCallImpl.ClientTransportProvider;
63 import io.grpc.internal.RetriableStream.ChannelBufferMeter;
64 import io.grpc.internal.RetriableStream.Throttle;
65 import java.net.URI;
66 import java.net.URISyntaxException;
67 import java.util.ArrayList;
68 import java.util.Collection;
69 import java.util.Collections;
70 import java.util.HashSet;
71 import java.util.List;
72 import java.util.Map;
73 import java.util.Set;
74 import java.util.concurrent.CountDownLatch;
75 import java.util.concurrent.Executor;
76 import java.util.concurrent.ScheduledFuture;
77 import java.util.concurrent.TimeUnit;
78 import java.util.concurrent.atomic.AtomicBoolean;
79 import java.util.logging.Level;
80 import java.util.logging.Logger;
81 import java.util.regex.Pattern;
82 import javax.annotation.CheckForNull;
83 import javax.annotation.Nullable;
84 import javax.annotation.concurrent.GuardedBy;
85 import javax.annotation.concurrent.ThreadSafe;
86 
87 /** A communication channel for making outgoing RPCs. */
88 @ThreadSafe
89 final class ManagedChannelImpl extends ManagedChannel implements
90     InternalInstrumented<ChannelStats> {
91   static final Logger logger = Logger.getLogger(ManagedChannelImpl.class.getName());
92 
93   // Matching this pattern means the target string is a URI target or at least intended to be one.
94   // A URI target must be an absolute hierarchical URI.
95   // From RFC 2396: scheme = alpha *( alpha | digit | "+" | "-" | "." )
96   @VisibleForTesting
97   static final Pattern URI_PATTERN = Pattern.compile("[a-zA-Z][a-zA-Z0-9+.-]*:/.*");
98 
99   static final long IDLE_TIMEOUT_MILLIS_DISABLE = -1;
100 
101   @VisibleForTesting
102   static final long SUBCHANNEL_SHUTDOWN_DELAY_SECONDS = 5;
103 
104   @VisibleForTesting
105   static final Status SHUTDOWN_NOW_STATUS =
106       Status.UNAVAILABLE.withDescription("Channel shutdownNow invoked");
107 
108   @VisibleForTesting
109   static final Status SHUTDOWN_STATUS =
110       Status.UNAVAILABLE.withDescription("Channel shutdown invoked");
111 
112   @VisibleForTesting
113   static final Status SUBCHANNEL_SHUTDOWN_STATUS =
114       Status.UNAVAILABLE.withDescription("Subchannel shutdown invoked");
115 
116   private final InternalLogId logId = InternalLogId.allocate(getClass().getName());
117   private final String target;
118   private final NameResolver.Factory nameResolverFactory;
119   private final Attributes nameResolverParams;
120   private final LoadBalancer.Factory loadBalancerFactory;
121   private final ClientTransportFactory transportFactory;
122   private final Executor executor;
123   private final ObjectPool<? extends Executor> executorPool;
124   private final ObjectPool<? extends Executor> oobExecutorPool;
125   private final TimeProvider timeProvider;
126   private final int maxTraceEvents;
127 
128   private final ChannelExecutor channelExecutor = new PanicChannelExecutor();
129 
130   private boolean fullStreamDecompression;
131 
132   private final DecompressorRegistry decompressorRegistry;
133   private final CompressorRegistry compressorRegistry;
134 
135   private final Supplier<Stopwatch> stopwatchSupplier;
136   /** The timout before entering idle mode. */
137   private final long idleTimeoutMillis;
138 
139   private final ConnectivityStateManager channelStateManager = new ConnectivityStateManager();
140 
141   private final ServiceConfigInterceptor serviceConfigInterceptor;
142 
143   private final BackoffPolicy.Provider backoffPolicyProvider;
144 
145   /**
146    * We delegate to this channel, so that we can have interceptors as necessary. If there aren't
147    * any interceptors and the {@link io.grpc.BinaryLog} is {@code null} then this will just be a
148    * {@link RealChannel}.
149    */
150   private final Channel interceptorChannel;
151   @Nullable private final String userAgent;
152 
153   // Only null after channel is terminated. Must be assigned from the channelExecutor.
154   private NameResolver nameResolver;
155 
156   // Must be accessed from the channelExecutor.
157   private boolean nameResolverStarted;
158 
159   // null when channel is in idle mode.  Must be assigned from channelExecutor.
160   @Nullable
161   private LbHelperImpl lbHelper;
162 
163   // Must ONLY be assigned from updateSubchannelPicker(), which is called from channelExecutor.
164   // null if channel is in idle mode.
165   @Nullable
166   private volatile SubchannelPicker subchannelPicker;
167 
168   // Must be accessed from the channelExecutor
169   private boolean panicMode;
170 
171   // Must be mutated from channelExecutor
172   // If any monitoring hook to be added later needs to get a snapshot of this Set, we could
173   // switch to a ConcurrentHashMap.
174   private final Set<InternalSubchannel> subchannels = new HashSet<InternalSubchannel>(16, .75f);
175 
176   // Must be mutated from channelExecutor
177   private final Set<OobChannel> oobChannels = new HashSet<OobChannel>(1, .75f);
178 
179   // reprocess() must be run from channelExecutor
180   private final DelayedClientTransport delayedTransport;
181   private final UncommittedRetriableStreamsRegistry uncommittedRetriableStreamsRegistry
182       = new UncommittedRetriableStreamsRegistry();
183 
184   // Shutdown states.
185   //
186   // Channel's shutdown process:
187   // 1. shutdown(): stop accepting new calls from applications
188   //   1a shutdown <- true
189   //   1b subchannelPicker <- null
190   //   1c delayedTransport.shutdown()
191   // 2. delayedTransport terminated: stop stream-creation functionality
192   //   2a terminating <- true
193   //   2b loadBalancer.shutdown()
194   //     * LoadBalancer will shutdown subchannels and OOB channels
195   //   2c loadBalancer <- null
196   //   2d nameResolver.shutdown()
197   //   2e nameResolver <- null
198   // 3. All subchannels and OOB channels terminated: Channel considered terminated
199 
200   private final AtomicBoolean shutdown = new AtomicBoolean(false);
201   // Must only be mutated and read from channelExecutor
202   private boolean shutdownNowed;
203   // Must be mutated from channelExecutor
204   private volatile boolean terminating;
205   // Must be mutated from channelExecutor
206   private volatile boolean terminated;
207   private final CountDownLatch terminatedLatch = new CountDownLatch(1);
208 
209   private final CallTracer.Factory callTracerFactory;
210   private final CallTracer channelCallTracer;
211   @CheckForNull
212   private final ChannelTracer channelTracer;
213   private final InternalChannelz channelz;
214   @CheckForNull
215   private Boolean haveBackends; // a flag for doing channel tracing when flipped
216   @Nullable
217   private Map<String, Object> lastServiceConfig; // used for channel tracing when value changed
218 
219   // One instance per channel.
220   private final ChannelBufferMeter channelBufferUsed = new ChannelBufferMeter();
221 
222   @Nullable
223   private Throttle throttle;
224 
225   private final long perRpcBufferLimit;
226   private final long channelBufferLimit;
227 
228   // Temporary false flag that can skip the retry code path.
229   private final boolean retryEnabled;
230 
231   // Called from channelExecutor
232   private final ManagedClientTransport.Listener delayedTransportListener =
233       new DelayedTransportListener();
234 
235   // Must be called from channelExecutor
maybeShutdownNowSubchannels()236   private void maybeShutdownNowSubchannels() {
237     if (shutdownNowed) {
238       for (InternalSubchannel subchannel : subchannels) {
239         subchannel.shutdownNow(SHUTDOWN_NOW_STATUS);
240       }
241       for (OobChannel oobChannel : oobChannels) {
242         oobChannel.getInternalSubchannel().shutdownNow(SHUTDOWN_NOW_STATUS);
243       }
244     }
245   }
246 
247   // Must be accessed from channelExecutor
248   @VisibleForTesting
249   final InUseStateAggregator<Object> inUseStateAggregator = new IdleModeStateAggregator();
250 
251   @Override
getStats()252   public ListenableFuture<ChannelStats> getStats() {
253     final SettableFuture<ChannelStats> ret = SettableFuture.create();
254     final class StatsFetcher implements Runnable {
255       @Override
256       public void run() {
257         ChannelStats.Builder builder = new InternalChannelz.ChannelStats.Builder();
258         channelCallTracer.updateBuilder(builder);
259         if (channelTracer != null) {
260           channelTracer.updateBuilder(builder);
261         }
262         builder.setTarget(target).setState(channelStateManager.getState());
263         List<InternalWithLogId> children = new ArrayList<>();
264         children.addAll(subchannels);
265         children.addAll(oobChannels);
266         builder.setSubchannels(children);
267         ret.set(builder.build());
268       }
269     }
270 
271     // subchannels and oobchannels can only be accessed from channelExecutor
272     channelExecutor.executeLater(new StatsFetcher()).drain();
273     return ret;
274   }
275 
276   @Override
getLogId()277   public InternalLogId getLogId() {
278     return logId;
279   }
280 
281   // Run from channelExecutor
282   private class IdleModeTimer implements Runnable {
283 
284     @Override
run()285     public void run() {
286       enterIdleMode();
287     }
288   }
289 
290   // Must be called from channelExecutor
shutdownNameResolverAndLoadBalancer(boolean verifyActive)291   private void shutdownNameResolverAndLoadBalancer(boolean verifyActive) {
292     if (verifyActive) {
293       checkState(nameResolver != null, "nameResolver is null");
294       checkState(lbHelper != null, "lbHelper is null");
295     }
296     if (nameResolver != null) {
297       cancelNameResolverBackoff();
298       nameResolver.shutdown();
299       nameResolver = null;
300       nameResolverStarted = false;
301     }
302     if (lbHelper != null) {
303       lbHelper.lb.shutdown();
304       lbHelper = null;
305     }
306     subchannelPicker = null;
307   }
308 
309   /**
310    * Make the channel exit idle mode, if it's in it.
311    *
312    * <p>Must be called from channelExecutor
313    */
314   @VisibleForTesting
exitIdleMode()315   void exitIdleMode() {
316     if (shutdown.get() || panicMode) {
317       return;
318     }
319     if (inUseStateAggregator.isInUse()) {
320       // Cancel the timer now, so that a racing due timer will not put Channel on idleness
321       // when the caller of exitIdleMode() is about to use the returned loadBalancer.
322       cancelIdleTimer(false);
323     } else {
324       // exitIdleMode() may be called outside of inUseStateAggregator.handleNotInUse() while
325       // isInUse() == false, in which case we still need to schedule the timer.
326       rescheduleIdleTimer();
327     }
328     if (lbHelper != null) {
329       return;
330     }
331     logger.log(Level.FINE, "[{0}] Exiting idle mode", getLogId());
332     lbHelper = new LbHelperImpl(nameResolver);
333     lbHelper.lb = loadBalancerFactory.newLoadBalancer(lbHelper);
334 
335     NameResolverListenerImpl listener = new NameResolverListenerImpl(lbHelper);
336     try {
337       nameResolver.start(listener);
338       nameResolverStarted = true;
339     } catch (Throwable t) {
340       listener.onError(Status.fromThrowable(t));
341     }
342   }
343 
344   // Must be run from channelExecutor
enterIdleMode()345   private void enterIdleMode() {
346     logger.log(Level.FINE, "[{0}] Entering idle mode", getLogId());
347     // nameResolver and loadBalancer are guaranteed to be non-null.  If any of them were null,
348     // either the idleModeTimer ran twice without exiting the idle mode, or the task in shutdown()
349     // did not cancel idleModeTimer, or enterIdle() ran while shutdown or in idle, all of
350     // which are bugs.
351     shutdownNameResolverAndLoadBalancer(true);
352     delayedTransport.reprocess(null);
353     nameResolver = getNameResolver(target, nameResolverFactory, nameResolverParams);
354     if (channelTracer != null) {
355       channelTracer.reportEvent(
356           new ChannelTrace.Event.Builder()
357               .setDescription("Entering IDLE state")
358               .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
359               .setTimestampNanos(timeProvider.currentTimeNanos())
360               .build());
361     }
362     channelStateManager.gotoState(IDLE);
363     if (inUseStateAggregator.isInUse()) {
364       exitIdleMode();
365     }
366   }
367 
368   // Must be run from channelExecutor
cancelIdleTimer(boolean permanent)369   private void cancelIdleTimer(boolean permanent) {
370     idleTimer.cancel(permanent);
371   }
372 
373   // Always run from channelExecutor
rescheduleIdleTimer()374   private void rescheduleIdleTimer() {
375     if (idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
376       return;
377     }
378     idleTimer.reschedule(idleTimeoutMillis, TimeUnit.MILLISECONDS);
379   }
380 
381   // Run from channelExecutor
382   @VisibleForTesting
383   class NameResolverRefresh implements Runnable {
384     // Only mutated from channelExecutor
385     boolean cancelled;
386 
387     @Override
run()388     public void run() {
389       if (cancelled) {
390         // Race detected: this task was scheduled on channelExecutor before
391         // cancelNameResolverBackoff() could cancel the timer.
392         return;
393       }
394       nameResolverRefreshFuture = null;
395       nameResolverRefresh = null;
396       if (nameResolver != null) {
397         nameResolver.refresh();
398       }
399     }
400   }
401 
402   // Must be used from channelExecutor
403   @Nullable private ScheduledFuture<?> nameResolverRefreshFuture;
404   // Must be used from channelExecutor
405   @Nullable private NameResolverRefresh nameResolverRefresh;
406   // The policy to control backoff between name resolution attempts. Non-null when an attempt is
407   // scheduled. Must be used from channelExecutor
408   @Nullable private BackoffPolicy nameResolverBackoffPolicy;
409 
410   // Must be run from channelExecutor
cancelNameResolverBackoff()411   private void cancelNameResolverBackoff() {
412     if (nameResolverRefreshFuture != null) {
413       nameResolverRefreshFuture.cancel(false);
414       nameResolverRefresh.cancelled = true;
415       nameResolverRefreshFuture = null;
416       nameResolverRefresh = null;
417       nameResolverBackoffPolicy = null;
418     }
419   }
420 
421   private final class ChannelTransportProvider implements ClientTransportProvider {
422     @Override
get(PickSubchannelArgs args)423     public ClientTransport get(PickSubchannelArgs args) {
424       SubchannelPicker pickerCopy = subchannelPicker;
425       if (shutdown.get()) {
426         // If channel is shut down, delayedTransport is also shut down which will fail the stream
427         // properly.
428         return delayedTransport;
429       }
430       if (pickerCopy == null) {
431         final class ExitIdleModeForTransport implements Runnable {
432           @Override
433           public void run() {
434             exitIdleMode();
435           }
436         }
437 
438         channelExecutor.executeLater(new ExitIdleModeForTransport()).drain();
439         return delayedTransport;
440       }
441       // There is no need to reschedule the idle timer here.
442       //
443       // pickerCopy != null, which means idle timer has not expired when this method starts.
444       // Even if idle timer expires right after we grab pickerCopy, and it shuts down LoadBalancer
445       // which calls Subchannel.shutdown(), the InternalSubchannel will be actually shutdown after
446       // SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, which gives the caller time to start RPC on it.
447       //
448       // In most cases the idle timer is scheduled to fire after the transport has created the
449       // stream, which would have reported in-use state to the channel that would have cancelled
450       // the idle timer.
451       PickResult pickResult = pickerCopy.pickSubchannel(args);
452       ClientTransport transport = GrpcUtil.getTransportFromPickResult(
453           pickResult, args.getCallOptions().isWaitForReady());
454       if (transport != null) {
455         return transport;
456       }
457       return delayedTransport;
458     }
459 
460     @Override
newRetriableStream( final MethodDescriptor<ReqT, ?> method, final CallOptions callOptions, final Metadata headers, final Context context)461     public <ReqT> RetriableStream<ReqT> newRetriableStream(
462         final MethodDescriptor<ReqT, ?> method,
463         final CallOptions callOptions,
464         final Metadata headers,
465         final Context context) {
466       checkState(retryEnabled, "retry should be enabled");
467       final class RetryStream extends RetriableStream<ReqT> {
468         RetryStream() {
469           super(
470               method,
471               headers,
472               channelBufferUsed,
473               perRpcBufferLimit,
474               channelBufferLimit,
475               getCallExecutor(callOptions),
476               transportFactory.getScheduledExecutorService(),
477               callOptions.getOption(RETRY_POLICY_KEY),
478               callOptions.getOption(HEDGING_POLICY_KEY),
479               throttle);
480         }
481 
482         @Override
483         Status prestart() {
484           return uncommittedRetriableStreamsRegistry.add(this);
485         }
486 
487         @Override
488         void postCommit() {
489           uncommittedRetriableStreamsRegistry.remove(this);
490         }
491 
492         @Override
493         ClientStream newSubstream(ClientStreamTracer.Factory tracerFactory, Metadata newHeaders) {
494           CallOptions newOptions = callOptions.withStreamTracerFactory(tracerFactory);
495           ClientTransport transport =
496               get(new PickSubchannelArgsImpl(method, newHeaders, newOptions));
497           Context origContext = context.attach();
498           try {
499             return transport.newStream(method, newHeaders, newOptions);
500           } finally {
501             context.detach(origContext);
502           }
503         }
504       }
505 
506       return new RetryStream();
507     }
508   }
509 
510   private final ClientTransportProvider transportProvider = new ChannelTransportProvider();
511 
512   private final Rescheduler idleTimer;
513 
ManagedChannelImpl( AbstractManagedChannelImplBuilder<?> builder, ClientTransportFactory clientTransportFactory, BackoffPolicy.Provider backoffPolicyProvider, ObjectPool<? extends Executor> oobExecutorPool, Supplier<Stopwatch> stopwatchSupplier, List<ClientInterceptor> interceptors, final TimeProvider timeProvider)514   ManagedChannelImpl(
515       AbstractManagedChannelImplBuilder<?> builder,
516       ClientTransportFactory clientTransportFactory,
517       BackoffPolicy.Provider backoffPolicyProvider,
518       ObjectPool<? extends Executor> oobExecutorPool,
519       Supplier<Stopwatch> stopwatchSupplier,
520       List<ClientInterceptor> interceptors,
521       final TimeProvider timeProvider) {
522     this.target = checkNotNull(builder.target, "target");
523     this.nameResolverFactory = builder.getNameResolverFactory();
524     this.nameResolverParams = checkNotNull(builder.getNameResolverParams(), "nameResolverParams");
525     this.nameResolver = getNameResolver(target, nameResolverFactory, nameResolverParams);
526     this.timeProvider = checkNotNull(timeProvider, "timeProvider");
527     maxTraceEvents = builder.maxTraceEvents;
528     if (maxTraceEvents > 0) {
529       long currentTimeNanos = timeProvider.currentTimeNanos();
530       channelTracer = new ChannelTracer(builder.maxTraceEvents, currentTimeNanos, "Channel");
531     } else {
532       channelTracer = null;
533     }
534     if (builder.loadBalancerFactory == null) {
535       this.loadBalancerFactory = new AutoConfiguredLoadBalancerFactory(channelTracer, timeProvider);
536     } else {
537       this.loadBalancerFactory = builder.loadBalancerFactory;
538     }
539     this.executorPool = checkNotNull(builder.executorPool, "executorPool");
540     this.oobExecutorPool = checkNotNull(oobExecutorPool, "oobExecutorPool");
541     this.executor = checkNotNull(executorPool.getObject(), "executor");
542     this.delayedTransport = new DelayedClientTransport(this.executor, this.channelExecutor);
543     this.delayedTransport.start(delayedTransportListener);
544     this.backoffPolicyProvider = backoffPolicyProvider;
545     this.transportFactory =
546         new CallCredentialsApplyingTransportFactory(clientTransportFactory, this.executor);
547     this.retryEnabled = builder.retryEnabled && !builder.temporarilyDisableRetry;
548     serviceConfigInterceptor = new ServiceConfigInterceptor(
549         retryEnabled, builder.maxRetryAttempts, builder.maxHedgedAttempts);
550     Channel channel = new RealChannel(nameResolver.getServiceAuthority());
551     channel = ClientInterceptors.intercept(channel, serviceConfigInterceptor);
552     if (builder.binlog != null) {
553       channel = builder.binlog.wrapChannel(channel);
554     }
555     this.interceptorChannel = ClientInterceptors.intercept(channel, interceptors);
556     this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
557     if (builder.idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
558       this.idleTimeoutMillis = builder.idleTimeoutMillis;
559     } else {
560       checkArgument(
561           builder.idleTimeoutMillis
562               >= AbstractManagedChannelImplBuilder.IDLE_MODE_MIN_TIMEOUT_MILLIS,
563           "invalid idleTimeoutMillis %s", builder.idleTimeoutMillis);
564       this.idleTimeoutMillis = builder.idleTimeoutMillis;
565     }
566 
567     final class AutoDrainChannelExecutor implements Executor {
568 
569       @Override
570       public void execute(Runnable command) {
571         channelExecutor.executeLater(command);
572         channelExecutor.drain();
573       }
574     }
575 
576     idleTimer = new Rescheduler(
577         new IdleModeTimer(),
578         new AutoDrainChannelExecutor(),
579         transportFactory.getScheduledExecutorService(),
580         stopwatchSupplier.get());
581     this.fullStreamDecompression = builder.fullStreamDecompression;
582     this.decompressorRegistry = checkNotNull(builder.decompressorRegistry, "decompressorRegistry");
583     this.compressorRegistry = checkNotNull(builder.compressorRegistry, "compressorRegistry");
584     this.userAgent = builder.userAgent;
585 
586     this.channelBufferLimit = builder.retryBufferSize;
587     this.perRpcBufferLimit = builder.perRpcBufferLimit;
588     final class ChannelCallTracerFactory implements CallTracer.Factory {
589       @Override
590       public CallTracer create() {
591         return new CallTracer(timeProvider);
592       }
593     }
594 
595     this.callTracerFactory = new ChannelCallTracerFactory();
596     channelCallTracer = callTracerFactory.create();
597     this.channelz = checkNotNull(builder.channelz);
598     channelz.addRootChannel(this);
599 
600     logger.log(Level.FINE, "[{0}] Created with target {1}", new Object[] {getLogId(), target});
601   }
602 
603   @VisibleForTesting
getNameResolver(String target, NameResolver.Factory nameResolverFactory, Attributes nameResolverParams)604   static NameResolver getNameResolver(String target, NameResolver.Factory nameResolverFactory,
605       Attributes nameResolverParams) {
606     // Finding a NameResolver. Try using the target string as the URI. If that fails, try prepending
607     // "dns:///".
608     URI targetUri = null;
609     StringBuilder uriSyntaxErrors = new StringBuilder();
610     try {
611       targetUri = new URI(target);
612       // For "localhost:8080" this would likely cause newNameResolver to return null, because
613       // "localhost" is parsed as the scheme. Will fall into the next branch and try
614       // "dns:///localhost:8080".
615     } catch (URISyntaxException e) {
616       // Can happen with ip addresses like "[::1]:1234" or 127.0.0.1:1234.
617       uriSyntaxErrors.append(e.getMessage());
618     }
619     if (targetUri != null) {
620       NameResolver resolver = nameResolverFactory.newNameResolver(targetUri, nameResolverParams);
621       if (resolver != null) {
622         return resolver;
623       }
624       // "foo.googleapis.com:8080" cause resolver to be null, because "foo.googleapis.com" is an
625       // unmapped scheme. Just fall through and will try "dns:///foo.googleapis.com:8080"
626     }
627 
628     // If we reached here, the targetUri couldn't be used.
629     if (!URI_PATTERN.matcher(target).matches()) {
630       // It doesn't look like a URI target. Maybe it's an authority string. Try with the default
631       // scheme from the factory.
632       try {
633         targetUri = new URI(nameResolverFactory.getDefaultScheme(), "", "/" + target, null);
634       } catch (URISyntaxException e) {
635         // Should not be possible.
636         throw new IllegalArgumentException(e);
637       }
638       NameResolver resolver = nameResolverFactory.newNameResolver(targetUri, nameResolverParams);
639       if (resolver != null) {
640         return resolver;
641       }
642     }
643     throw new IllegalArgumentException(String.format(
644         "cannot find a NameResolver for %s%s",
645         target, uriSyntaxErrors.length() > 0 ? " (" + uriSyntaxErrors + ")" : ""));
646   }
647 
648   /**
649    * Initiates an orderly shutdown in which preexisting calls continue but new calls are immediately
650    * cancelled.
651    */
652   @Override
shutdown()653   public ManagedChannelImpl shutdown() {
654     logger.log(Level.FINE, "[{0}] shutdown() called", getLogId());
655     if (!shutdown.compareAndSet(false, true)) {
656       return this;
657     }
658 
659     // Put gotoState(SHUTDOWN) as early into the channelExecutor's queue as possible.
660     // delayedTransport.shutdown() may also add some tasks into the queue. But some things inside
661     // delayedTransport.shutdown() like setting delayedTransport.shutdown = true are not run in the
662     // channelExecutor's queue and should not be blocked, so we do not drain() immediately here.
663     final class Shutdown implements Runnable {
664       @Override
665       public void run() {
666         if (channelTracer != null) {
667           channelTracer.reportEvent(new ChannelTrace.Event.Builder()
668               .setDescription("Entering SHUTDOWN state")
669               .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
670               .setTimestampNanos(timeProvider.currentTimeNanos())
671               .build());
672         }
673         channelStateManager.gotoState(SHUTDOWN);
674       }
675     }
676 
677     channelExecutor.executeLater(new Shutdown());
678 
679     uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS);
680     final class CancelIdleTimer implements Runnable {
681       @Override
682       public void run() {
683         cancelIdleTimer(/* permanent= */ true);
684       }
685     }
686 
687     channelExecutor.executeLater(new CancelIdleTimer()).drain();
688     logger.log(Level.FINE, "[{0}] Shutting down", getLogId());
689     return this;
690   }
691 
692   /**
693    * Initiates a forceful shutdown in which preexisting and new calls are cancelled. Although
694    * forceful, the shutdown process is still not instantaneous; {@link #isTerminated()} will likely
695    * return {@code false} immediately after this method returns.
696    */
697   @Override
shutdownNow()698   public ManagedChannelImpl shutdownNow() {
699     logger.log(Level.FINE, "[{0}] shutdownNow() called", getLogId());
700     shutdown();
701     uncommittedRetriableStreamsRegistry.onShutdownNow(SHUTDOWN_NOW_STATUS);
702     final class ShutdownNow implements Runnable {
703       @Override
704       public void run() {
705         if (shutdownNowed) {
706           return;
707         }
708         shutdownNowed = true;
709         maybeShutdownNowSubchannels();
710       }
711     }
712 
713     channelExecutor.executeLater(new ShutdownNow()).drain();
714     return this;
715   }
716 
717   // Called from channelExecutor
718   @VisibleForTesting
panic(final Throwable t)719   void panic(final Throwable t) {
720     if (panicMode) {
721       // Preserve the first panic information
722       return;
723     }
724     panicMode = true;
725     cancelIdleTimer(/* permanent= */ true);
726     shutdownNameResolverAndLoadBalancer(false);
727     final class PanicSubchannelPicker extends SubchannelPicker {
728       private final PickResult panicPickResult =
729           PickResult.withDrop(
730               Status.INTERNAL.withDescription("Panic! This is a bug!").withCause(t));
731 
732       @Override
733       public PickResult pickSubchannel(PickSubchannelArgs args) {
734         return panicPickResult;
735       }
736     }
737 
738     updateSubchannelPicker(new PanicSubchannelPicker());
739     if (channelTracer != null) {
740       channelTracer.reportEvent(
741           new ChannelTrace.Event.Builder()
742               .setDescription("Entering TRANSIENT_FAILURE state")
743               .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
744               .setTimestampNanos(timeProvider.currentTimeNanos())
745               .build());
746     }
747     channelStateManager.gotoState(TRANSIENT_FAILURE);
748   }
749 
750   // Called from channelExecutor
updateSubchannelPicker(SubchannelPicker newPicker)751   private void updateSubchannelPicker(SubchannelPicker newPicker) {
752     subchannelPicker = newPicker;
753     delayedTransport.reprocess(newPicker);
754   }
755 
756   @Override
isShutdown()757   public boolean isShutdown() {
758     return shutdown.get();
759   }
760 
761   @Override
awaitTermination(long timeout, TimeUnit unit)762   public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
763     return terminatedLatch.await(timeout, unit);
764   }
765 
766   @Override
isTerminated()767   public boolean isTerminated() {
768     return terminated;
769   }
770 
771   /*
772    * Creates a new outgoing call on the channel.
773    */
774   @Override
newCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions)775   public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method,
776       CallOptions callOptions) {
777     return interceptorChannel.newCall(method, callOptions);
778   }
779 
780   @Override
authority()781   public String authority() {
782     return interceptorChannel.authority();
783   }
784 
getCallExecutor(CallOptions callOptions)785   private Executor getCallExecutor(CallOptions callOptions) {
786     Executor executor = callOptions.getExecutor();
787     if (executor == null) {
788       executor = this.executor;
789     }
790     return executor;
791   }
792 
793   private class RealChannel extends Channel {
794     // Set when the NameResolver is initially created. When we create a new NameResolver for the
795     // same target, the new instance must have the same value.
796     private final String authority;
797 
RealChannel(String authority)798     private RealChannel(String authority) {
799       this.authority =  checkNotNull(authority, "authority");
800     }
801 
802     @Override
newCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions)803     public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method,
804         CallOptions callOptions) {
805       return new ClientCallImpl<ReqT, RespT>(
806               method,
807               getCallExecutor(callOptions),
808               callOptions,
809               transportProvider,
810               terminated ? null : transportFactory.getScheduledExecutorService(),
811               channelCallTracer,
812               retryEnabled)
813           .setFullStreamDecompression(fullStreamDecompression)
814           .setDecompressorRegistry(decompressorRegistry)
815           .setCompressorRegistry(compressorRegistry);
816     }
817 
818     @Override
authority()819     public String authority() {
820       return authority;
821     }
822   }
823 
824   /**
825    * Terminate the channel if termination conditions are met.
826    */
827   // Must be run from channelExecutor
maybeTerminateChannel()828   private void maybeTerminateChannel() {
829     if (terminated) {
830       return;
831     }
832     if (shutdown.get() && subchannels.isEmpty() && oobChannels.isEmpty()) {
833       logger.log(Level.FINE, "[{0}] Terminated", getLogId());
834       channelz.removeRootChannel(this);
835       terminated = true;
836       terminatedLatch.countDown();
837       executorPool.returnObject(executor);
838       // Release the transport factory so that it can deallocate any resources.
839       transportFactory.close();
840     }
841   }
842 
843   @Override
getState(boolean requestConnection)844   public ConnectivityState getState(boolean requestConnection) {
845     ConnectivityState savedChannelState = channelStateManager.getState();
846     if (requestConnection && savedChannelState == IDLE) {
847       final class RequestConnection implements Runnable {
848         @Override
849         public void run() {
850           exitIdleMode();
851           if (subchannelPicker != null) {
852             subchannelPicker.requestConnection();
853           }
854         }
855       }
856 
857       channelExecutor.executeLater(new RequestConnection()).drain();
858     }
859     return savedChannelState;
860   }
861 
862   @Override
notifyWhenStateChanged(final ConnectivityState source, final Runnable callback)863   public void notifyWhenStateChanged(final ConnectivityState source, final Runnable callback) {
864     final class NotifyStateChanged implements Runnable {
865       @Override
866       public void run() {
867         channelStateManager.notifyWhenStateChanged(callback, executor, source);
868       }
869     }
870 
871     channelExecutor.executeLater(new NotifyStateChanged()).drain();
872   }
873 
874   @Override
resetConnectBackoff()875   public void resetConnectBackoff() {
876     final class ResetConnectBackoff implements Runnable {
877       @Override
878       public void run() {
879         if (shutdown.get()) {
880           return;
881         }
882         if (nameResolverRefreshFuture != null) {
883           checkState(nameResolverStarted, "name resolver must be started");
884           cancelNameResolverBackoff();
885           nameResolver.refresh();
886         }
887         for (InternalSubchannel subchannel : subchannels) {
888           subchannel.resetConnectBackoff();
889         }
890         for (OobChannel oobChannel : oobChannels) {
891           oobChannel.resetConnectBackoff();
892         }
893       }
894     }
895 
896     channelExecutor.executeLater(new ResetConnectBackoff()).drain();
897   }
898 
899   @Override
enterIdle()900   public void enterIdle() {
901     final class PrepareToLoseNetworkRunnable implements Runnable {
902       @Override
903       public void run() {
904         if (shutdown.get() || lbHelper == null) {
905           return;
906         }
907         cancelIdleTimer(/* permanent= */ false);
908         enterIdleMode();
909       }
910     }
911 
912     channelExecutor.executeLater(new PrepareToLoseNetworkRunnable()).drain();
913   }
914 
915   /**
916    * A registry that prevents channel shutdown from killing existing retry attempts that are in
917    * backoff.
918    */
919   private final class UncommittedRetriableStreamsRegistry {
920     // TODO(zdapeng): This means we would acquire a lock for each new retry-able stream,
921     // it's worthwhile to look for a lock-free approach.
922     final Object lock = new Object();
923 
924     @GuardedBy("lock")
925     Collection<ClientStream> uncommittedRetriableStreams = new HashSet<>();
926 
927     @GuardedBy("lock")
928     Status shutdownStatus;
929 
onShutdown(Status reason)930     void onShutdown(Status reason) {
931       boolean shouldShutdownDelayedTransport = false;
932       synchronized (lock) {
933         if (shutdownStatus != null) {
934           return;
935         }
936         shutdownStatus = reason;
937         // Keep the delayedTransport open until there is no more uncommitted streams, b/c those
938         // retriable streams, which may be in backoff and not using any transport, are already
939         // started RPCs.
940         if (uncommittedRetriableStreams.isEmpty()) {
941           shouldShutdownDelayedTransport = true;
942         }
943       }
944 
945       if (shouldShutdownDelayedTransport) {
946         delayedTransport.shutdown(reason);
947       }
948     }
949 
onShutdownNow(Status reason)950     void onShutdownNow(Status reason) {
951       onShutdown(reason);
952       Collection<ClientStream> streams;
953 
954       synchronized (lock) {
955         streams = new ArrayList<>(uncommittedRetriableStreams);
956       }
957 
958       for (ClientStream stream : streams) {
959         stream.cancel(reason);
960       }
961       delayedTransport.shutdownNow(reason);
962     }
963 
964     /**
965      * Registers a RetriableStream and return null if not shutdown, otherwise just returns the
966      * shutdown Status.
967      */
968     @Nullable
add(RetriableStream<?> retriableStream)969     Status add(RetriableStream<?> retriableStream) {
970       synchronized (lock) {
971         if (shutdownStatus != null) {
972           return shutdownStatus;
973         }
974         uncommittedRetriableStreams.add(retriableStream);
975         return null;
976       }
977     }
978 
remove(RetriableStream<?> retriableStream)979     void remove(RetriableStream<?> retriableStream) {
980       Status shutdownStatusCopy = null;
981 
982       synchronized (lock) {
983         uncommittedRetriableStreams.remove(retriableStream);
984         if (uncommittedRetriableStreams.isEmpty()) {
985           shutdownStatusCopy = shutdownStatus;
986           // Because retriable transport is long-lived, we take this opportunity to down-size the
987           // hashmap.
988           uncommittedRetriableStreams = new HashSet<>();
989         }
990       }
991 
992       if (shutdownStatusCopy != null) {
993         delayedTransport.shutdown(shutdownStatusCopy);
994       }
995     }
996   }
997 
998   private class LbHelperImpl extends LoadBalancer.Helper {
999     LoadBalancer lb;
1000     final NameResolver nr;
1001 
LbHelperImpl(NameResolver nr)1002     LbHelperImpl(NameResolver nr) {
1003       this.nr = checkNotNull(nr, "NameResolver");
1004     }
1005 
1006     // Must be called from channelExecutor
handleInternalSubchannelState(ConnectivityStateInfo newState)1007     private void handleInternalSubchannelState(ConnectivityStateInfo newState) {
1008       if (newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE) {
1009         nr.refresh();
1010       }
1011     }
1012 
1013     @Override
createSubchannel( List<EquivalentAddressGroup> addressGroups, Attributes attrs)1014     public AbstractSubchannel createSubchannel(
1015         List<EquivalentAddressGroup> addressGroups, Attributes attrs) {
1016       checkNotNull(addressGroups, "addressGroups");
1017       checkNotNull(attrs, "attrs");
1018       // TODO(ejona): can we be even stricter? Like loadBalancer == null?
1019       checkState(!terminated, "Channel is terminated");
1020       final SubchannelImpl subchannel = new SubchannelImpl(attrs);
1021       ChannelTracer subchannelTracer = null;
1022       long subchannelCreationTime = timeProvider.currentTimeNanos();
1023       if (maxTraceEvents > 0) {
1024         subchannelTracer = new ChannelTracer(maxTraceEvents, subchannelCreationTime, "Subchannel");
1025       }
1026 
1027       final class ManagedInternalSubchannelCallback extends InternalSubchannel.Callback {
1028         // All callbacks are run in channelExecutor
1029         @Override
1030         void onTerminated(InternalSubchannel is) {
1031           subchannels.remove(is);
1032           channelz.removeSubchannel(is);
1033           maybeTerminateChannel();
1034         }
1035 
1036         @Override
1037         void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
1038           handleInternalSubchannelState(newState);
1039           // Call LB only if it's not shutdown.  If LB is shutdown, lbHelper won't match.
1040           if (LbHelperImpl.this == ManagedChannelImpl.this.lbHelper) {
1041             lb.handleSubchannelState(subchannel, newState);
1042           }
1043         }
1044 
1045         @Override
1046         void onInUse(InternalSubchannel is) {
1047           inUseStateAggregator.updateObjectInUse(is, true);
1048         }
1049 
1050         @Override
1051         void onNotInUse(InternalSubchannel is) {
1052           inUseStateAggregator.updateObjectInUse(is, false);
1053         }
1054       }
1055 
1056       final InternalSubchannel internalSubchannel = new InternalSubchannel(
1057           addressGroups,
1058           authority(),
1059           userAgent,
1060           backoffPolicyProvider,
1061           transportFactory,
1062           transportFactory.getScheduledExecutorService(),
1063           stopwatchSupplier,
1064           channelExecutor,
1065           new ManagedInternalSubchannelCallback(),
1066           channelz,
1067           callTracerFactory.create(),
1068           subchannelTracer,
1069           timeProvider);
1070       if (channelTracer != null) {
1071         channelTracer.reportEvent(new ChannelTrace.Event.Builder()
1072             .setDescription("Child channel created")
1073             .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1074             .setTimestampNanos(subchannelCreationTime)
1075             .setSubchannelRef(internalSubchannel)
1076             .build());
1077       }
1078       channelz.addSubchannel(internalSubchannel);
1079       subchannel.subchannel = internalSubchannel;
1080       logger.log(Level.FINE, "[{0}] {1} created for {2}",
1081           new Object[] {getLogId(), internalSubchannel.getLogId(), addressGroups});
1082 
1083       final class AddSubchannel implements Runnable {
1084         @Override
1085         public void run() {
1086           if (terminating) {
1087             // Because runSerialized() doesn't guarantee the runnable has been executed upon when
1088             // returning, the subchannel may still be returned to the balancer without being
1089             // shutdown even if "terminating" is already true.  The subchannel will not be used in
1090             // this case, because delayed transport has terminated when "terminating" becomes
1091             // true, and no more requests will be sent to balancer beyond this point.
1092             internalSubchannel.shutdown(SHUTDOWN_STATUS);
1093           }
1094           if (!terminated) {
1095             // If channel has not terminated, it will track the subchannel and block termination
1096             // for it.
1097             subchannels.add(internalSubchannel);
1098           }
1099         }
1100       }
1101 
1102       runSerialized(new AddSubchannel());
1103       return subchannel;
1104     }
1105 
1106     @Override
updateBalancingState( final ConnectivityState newState, final SubchannelPicker newPicker)1107     public void updateBalancingState(
1108         final ConnectivityState newState, final SubchannelPicker newPicker) {
1109       checkNotNull(newState, "newState");
1110       checkNotNull(newPicker, "newPicker");
1111       final class UpdateBalancingState implements Runnable {
1112         @Override
1113         public void run() {
1114           if (LbHelperImpl.this != lbHelper) {
1115             return;
1116           }
1117           updateSubchannelPicker(newPicker);
1118           // It's not appropriate to report SHUTDOWN state from lb.
1119           // Ignore the case of newState == SHUTDOWN for now.
1120           if (newState != SHUTDOWN) {
1121             if (channelTracer != null) {
1122               channelTracer.reportEvent(
1123                   new ChannelTrace.Event.Builder()
1124                       .setDescription("Entering " + newState + " state")
1125                       .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1126                       .setTimestampNanos(timeProvider.currentTimeNanos())
1127                       .build());
1128             }
1129             channelStateManager.gotoState(newState);
1130           }
1131         }
1132       }
1133 
1134       runSerialized(new UpdateBalancingState());
1135     }
1136 
1137     @Override
updateSubchannelAddresses( LoadBalancer.Subchannel subchannel, List<EquivalentAddressGroup> addrs)1138     public void updateSubchannelAddresses(
1139         LoadBalancer.Subchannel subchannel, List<EquivalentAddressGroup> addrs) {
1140       checkArgument(subchannel instanceof SubchannelImpl,
1141           "subchannel must have been returned from createSubchannel");
1142       ((SubchannelImpl) subchannel).subchannel.updateAddresses(addrs);
1143     }
1144 
1145     @Override
createOobChannel(EquivalentAddressGroup addressGroup, String authority)1146     public ManagedChannel createOobChannel(EquivalentAddressGroup addressGroup, String authority) {
1147       // TODO(ejona): can we be even stricter? Like terminating?
1148       checkState(!terminated, "Channel is terminated");
1149       long oobChannelCreationTime = timeProvider.currentTimeNanos();
1150       ChannelTracer oobChannelTracer = null;
1151       ChannelTracer subchannelTracer = null;
1152       if (channelTracer != null) {
1153         oobChannelTracer = new ChannelTracer(maxTraceEvents, oobChannelCreationTime, "OobChannel");
1154       }
1155       final OobChannel oobChannel = new OobChannel(
1156           authority, oobExecutorPool, transportFactory.getScheduledExecutorService(),
1157           channelExecutor, callTracerFactory.create(), oobChannelTracer, channelz, timeProvider);
1158       if (channelTracer != null) {
1159         channelTracer.reportEvent(new ChannelTrace.Event.Builder()
1160             .setDescription("Child channel created")
1161             .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1162             .setTimestampNanos(oobChannelCreationTime)
1163             .setChannelRef(oobChannel)
1164             .build());
1165         subchannelTracer = new ChannelTracer(maxTraceEvents, oobChannelCreationTime, "Subchannel");
1166       }
1167       final class ManagedOobChannelCallback extends InternalSubchannel.Callback {
1168         @Override
1169         void onTerminated(InternalSubchannel is) {
1170           oobChannels.remove(oobChannel);
1171           channelz.removeSubchannel(is);
1172           oobChannel.handleSubchannelTerminated();
1173           maybeTerminateChannel();
1174         }
1175 
1176         @Override
1177         void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
1178           handleInternalSubchannelState(newState);
1179           oobChannel.handleSubchannelStateChange(newState);
1180         }
1181       }
1182 
1183       final InternalSubchannel internalSubchannel = new InternalSubchannel(
1184           Collections.singletonList(addressGroup),
1185           authority, userAgent, backoffPolicyProvider, transportFactory,
1186           transportFactory.getScheduledExecutorService(), stopwatchSupplier, channelExecutor,
1187           // All callback methods are run from channelExecutor
1188           new ManagedOobChannelCallback(),
1189           channelz,
1190           callTracerFactory.create(),
1191           subchannelTracer,
1192           timeProvider);
1193       if (oobChannelTracer != null) {
1194         oobChannelTracer.reportEvent(new ChannelTrace.Event.Builder()
1195             .setDescription("Child channel created")
1196             .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1197             .setTimestampNanos(oobChannelCreationTime)
1198             .setSubchannelRef(internalSubchannel)
1199             .build());
1200       }
1201       channelz.addSubchannel(oobChannel);
1202       channelz.addSubchannel(internalSubchannel);
1203       oobChannel.setSubchannel(internalSubchannel);
1204       final class AddOobChannel implements Runnable {
1205         @Override
1206         public void run() {
1207           if (terminating) {
1208             oobChannel.shutdown();
1209           }
1210           if (!terminated) {
1211             // If channel has not terminated, it will track the subchannel and block termination
1212             // for it.
1213             oobChannels.add(oobChannel);
1214           }
1215         }
1216       }
1217 
1218       runSerialized(new AddOobChannel());
1219       return oobChannel;
1220     }
1221 
1222     @Override
updateOobChannelAddresses(ManagedChannel channel, EquivalentAddressGroup eag)1223     public void updateOobChannelAddresses(ManagedChannel channel, EquivalentAddressGroup eag) {
1224       checkArgument(channel instanceof OobChannel,
1225           "channel must have been returned from createOobChannel");
1226       ((OobChannel) channel).updateAddresses(eag);
1227     }
1228 
1229     @Override
getAuthority()1230     public String getAuthority() {
1231       return ManagedChannelImpl.this.authority();
1232     }
1233 
1234     @Override
getNameResolverFactory()1235     public NameResolver.Factory getNameResolverFactory() {
1236       return nameResolverFactory;
1237     }
1238 
1239     @Override
runSerialized(Runnable task)1240     public void runSerialized(Runnable task) {
1241       channelExecutor.executeLater(task).drain();
1242     }
1243   }
1244 
1245   private class NameResolverListenerImpl implements NameResolver.Listener {
1246     final LbHelperImpl helper;
1247 
NameResolverListenerImpl(LbHelperImpl helperImpl)1248     NameResolverListenerImpl(LbHelperImpl helperImpl) {
1249       this.helper = helperImpl;
1250     }
1251 
1252     @Override
onAddresses(final List<EquivalentAddressGroup> servers, final Attributes config)1253     public void onAddresses(final List<EquivalentAddressGroup> servers, final Attributes config) {
1254       if (servers.isEmpty()) {
1255         onError(Status.UNAVAILABLE.withDescription("NameResolver returned an empty list"));
1256         return;
1257       }
1258       if (logger.isLoggable(Level.FINE)) {
1259         logger.log(Level.FINE, "[{0}] resolved address: {1}, config={2}",
1260             new Object[]{getLogId(), servers, config});
1261       }
1262 
1263       if (channelTracer != null && (haveBackends == null || !haveBackends)) {
1264         channelTracer.reportEvent(new ChannelTrace.Event.Builder()
1265             .setDescription("Address resolved: " + servers)
1266             .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1267             .setTimestampNanos(timeProvider.currentTimeNanos())
1268             .build());
1269         haveBackends = true;
1270       }
1271       final Map<String, Object> serviceConfig =
1272           config.get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG);
1273       if (channelTracer != null && serviceConfig != null
1274           && !serviceConfig.equals(lastServiceConfig)) {
1275         channelTracer.reportEvent(new ChannelTrace.Event.Builder()
1276             .setDescription("Service config changed")
1277             .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1278             .setTimestampNanos(timeProvider.currentTimeNanos())
1279             .build());
1280         lastServiceConfig = serviceConfig;
1281       }
1282 
1283       final class NamesResolved implements Runnable {
1284         @Override
1285         public void run() {
1286           // Call LB only if it's not shutdown.  If LB is shutdown, lbHelper won't match.
1287           if (NameResolverListenerImpl.this.helper != ManagedChannelImpl.this.lbHelper) {
1288             return;
1289           }
1290 
1291           nameResolverBackoffPolicy = null;
1292 
1293           if (serviceConfig != null) {
1294             try {
1295               serviceConfigInterceptor.handleUpdate(serviceConfig);
1296               if (retryEnabled) {
1297                 throttle = getThrottle(config);
1298               }
1299             } catch (RuntimeException re) {
1300               logger.log(
1301                   Level.WARNING,
1302                   "[" + getLogId() + "] Unexpected exception from parsing service config",
1303                   re);
1304             }
1305           }
1306 
1307           helper.lb.handleResolvedAddressGroups(servers, config);
1308         }
1309       }
1310 
1311       helper.runSerialized(new NamesResolved());
1312     }
1313 
1314     @Override
onError(final Status error)1315     public void onError(final Status error) {
1316       checkArgument(!error.isOk(), "the error status must not be OK");
1317       logger.log(Level.WARNING, "[{0}] Failed to resolve name. status={1}",
1318           new Object[] {getLogId(), error});
1319       if (channelTracer != null && (haveBackends == null || haveBackends)) {
1320         channelTracer.reportEvent(new ChannelTrace.Event.Builder()
1321             .setDescription("Failed to resolve name")
1322             .setSeverity(ChannelTrace.Event.Severity.CT_WARNING)
1323             .setTimestampNanos(timeProvider.currentTimeNanos())
1324             .build());
1325         haveBackends = false;
1326       }
1327       final class NameResolverErrorHandler implements Runnable {
1328         @Override
1329         public void run() {
1330           // Call LB only if it's not shutdown.  If LB is shutdown, lbHelper won't match.
1331           if (NameResolverListenerImpl.this.helper != ManagedChannelImpl.this.lbHelper) {
1332             return;
1333           }
1334           helper.lb.handleNameResolutionError(error);
1335           if (nameResolverRefreshFuture != null) {
1336             // The name resolver may invoke onError multiple times, but we only want to
1337             // schedule one backoff attempt
1338             // TODO(ericgribkoff) Update contract of NameResolver.Listener or decide if we
1339             // want to reset the backoff interval upon repeated onError() calls
1340             return;
1341           }
1342           if (nameResolverBackoffPolicy == null) {
1343             nameResolverBackoffPolicy = backoffPolicyProvider.get();
1344           }
1345           long delayNanos = nameResolverBackoffPolicy.nextBackoffNanos();
1346           if (logger.isLoggable(Level.FINE)) {
1347             logger.log(
1348                 Level.FINE,
1349                 "[{0}] Scheduling DNS resolution backoff for {1} ns",
1350                 new Object[] {logId, delayNanos});
1351           }
1352           nameResolverRefresh = new NameResolverRefresh();
1353           nameResolverRefreshFuture =
1354               transportFactory
1355                   .getScheduledExecutorService()
1356                   .schedule(nameResolverRefresh, delayNanos, TimeUnit.NANOSECONDS);
1357         }
1358       }
1359 
1360       channelExecutor.executeLater(new NameResolverErrorHandler()).drain();
1361     }
1362   }
1363 
1364   @Nullable
getThrottle(Attributes config)1365   private static Throttle getThrottle(Attributes config) {
1366     return ServiceConfigUtil.getThrottlePolicy(
1367         config.get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG));
1368   }
1369 
1370   private final class SubchannelImpl extends AbstractSubchannel {
1371     // Set right after SubchannelImpl is created.
1372     InternalSubchannel subchannel;
1373     final Object shutdownLock = new Object();
1374     final Attributes attrs;
1375 
1376     @GuardedBy("shutdownLock")
1377     boolean shutdownRequested;
1378     @GuardedBy("shutdownLock")
1379     ScheduledFuture<?> delayedShutdownTask;
1380 
SubchannelImpl(Attributes attrs)1381     SubchannelImpl(Attributes attrs) {
1382       this.attrs = checkNotNull(attrs, "attrs");
1383     }
1384 
1385     @Override
obtainActiveTransport()1386     ClientTransport obtainActiveTransport() {
1387       return subchannel.obtainActiveTransport();
1388     }
1389 
1390     @Override
getInternalSubchannel()1391     InternalInstrumented<ChannelStats> getInternalSubchannel() {
1392       return subchannel;
1393     }
1394 
1395     @Override
shutdown()1396     public void shutdown() {
1397       synchronized (shutdownLock) {
1398         if (shutdownRequested) {
1399           if (terminating && delayedShutdownTask != null) {
1400             // shutdown() was previously called when terminating == false, thus a delayed shutdown()
1401             // was scheduled.  Now since terminating == true, We should expedite the shutdown.
1402             delayedShutdownTask.cancel(false);
1403             delayedShutdownTask = null;
1404             // Will fall through to the subchannel.shutdown() at the end.
1405           } else {
1406             return;
1407           }
1408         } else {
1409           shutdownRequested = true;
1410         }
1411         // Add a delay to shutdown to deal with the race between 1) a transport being picked and
1412         // newStream() being called on it, and 2) its Subchannel is shut down by LoadBalancer (e.g.,
1413         // because of address change, or because LoadBalancer is shutdown by Channel entering idle
1414         // mode). If (2) wins, the app will see a spurious error. We work around this by delaying
1415         // shutdown of Subchannel for a few seconds here.
1416         //
1417         // TODO(zhangkun83): consider a better approach
1418         // (https://github.com/grpc/grpc-java/issues/2562).
1419         if (!terminating) {
1420           final class ShutdownSubchannel implements Runnable {
1421             @Override
1422             public void run() {
1423               subchannel.shutdown(SUBCHANNEL_SHUTDOWN_STATUS);
1424             }
1425           }
1426 
1427           delayedShutdownTask = transportFactory.getScheduledExecutorService().schedule(
1428               new LogExceptionRunnable(
1429                   new ShutdownSubchannel()), SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
1430           return;
1431         }
1432       }
1433       // When terminating == true, no more real streams will be created. It's safe and also
1434       // desirable to shutdown timely.
1435       subchannel.shutdown(SHUTDOWN_STATUS);
1436     }
1437 
1438     @Override
requestConnection()1439     public void requestConnection() {
1440       subchannel.obtainActiveTransport();
1441     }
1442 
1443     @Override
getAllAddresses()1444     public List<EquivalentAddressGroup> getAllAddresses() {
1445       return subchannel.getAddressGroups();
1446     }
1447 
1448     @Override
getAttributes()1449     public Attributes getAttributes() {
1450       return attrs;
1451     }
1452 
1453     @Override
toString()1454     public String toString() {
1455       return subchannel.getLogId().toString();
1456     }
1457   }
1458 
1459   @Override
toString()1460   public String toString() {
1461     return MoreObjects.toStringHelper(this)
1462         .add("logId", logId.getId())
1463         .add("target", target)
1464         .toString();
1465   }
1466 
1467   private final class PanicChannelExecutor extends ChannelExecutor {
1468     @Override
handleUncaughtThrowable(Throwable t)1469     void handleUncaughtThrowable(Throwable t) {
1470       super.handleUncaughtThrowable(t);
1471       panic(t);
1472     }
1473   }
1474 
1475   /**
1476    * Called from channelExecutor.
1477    */
1478   private final class DelayedTransportListener implements ManagedClientTransport.Listener {
1479     @Override
transportShutdown(Status s)1480     public void transportShutdown(Status s) {
1481       checkState(shutdown.get(), "Channel must have been shut down");
1482     }
1483 
1484     @Override
transportReady()1485     public void transportReady() {
1486       // Don't care
1487     }
1488 
1489     @Override
transportInUse(final boolean inUse)1490     public void transportInUse(final boolean inUse) {
1491       inUseStateAggregator.updateObjectInUse(delayedTransport, inUse);
1492     }
1493 
1494     @Override
transportTerminated()1495     public void transportTerminated() {
1496       checkState(shutdown.get(), "Channel must have been shut down");
1497       terminating = true;
1498       shutdownNameResolverAndLoadBalancer(false);
1499       // No need to call channelStateManager since we are already in SHUTDOWN state.
1500       // Until LoadBalancer is shutdown, it may still create new subchannels.  We catch them
1501       // here.
1502       maybeShutdownNowSubchannels();
1503       maybeTerminateChannel();
1504     }
1505   }
1506 
1507   /**
1508    * Must be accessed from channelExecutor.
1509    */
1510   private final class IdleModeStateAggregator extends InUseStateAggregator<Object> {
1511     @Override
handleInUse()1512     void handleInUse() {
1513       exitIdleMode();
1514     }
1515 
1516     @Override
handleNotInUse()1517     void handleNotInUse() {
1518       if (shutdown.get()) {
1519         return;
1520       }
1521       rescheduleIdleTimer();
1522     }
1523   }
1524 }
1525