1 /*
2  * Copyright 2015 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.internal;
18 
19 import static com.google.common.base.Preconditions.checkState;
20 import static com.google.common.truth.Truth.assertThat;
21 import static io.grpc.ConnectivityState.CONNECTING;
22 import static io.grpc.ConnectivityState.IDLE;
23 import static io.grpc.ConnectivityState.READY;
24 import static io.grpc.ConnectivityState.SHUTDOWN;
25 import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
26 import static junit.framework.TestCase.assertNotSame;
27 import static org.junit.Assert.assertEquals;
28 import static org.junit.Assert.assertFalse;
29 import static org.junit.Assert.assertNotEquals;
30 import static org.junit.Assert.assertNotNull;
31 import static org.junit.Assert.assertNull;
32 import static org.junit.Assert.assertSame;
33 import static org.junit.Assert.assertTrue;
34 import static org.mockito.Matchers.any;
35 import static org.mockito.Matchers.anyObject;
36 import static org.mockito.Matchers.eq;
37 import static org.mockito.Matchers.same;
38 import static org.mockito.Mockito.atLeast;
39 import static org.mockito.Mockito.doAnswer;
40 import static org.mockito.Mockito.doThrow;
41 import static org.mockito.Mockito.inOrder;
42 import static org.mockito.Mockito.mock;
43 import static org.mockito.Mockito.never;
44 import static org.mockito.Mockito.times;
45 import static org.mockito.Mockito.verify;
46 import static org.mockito.Mockito.verifyNoMoreInteractions;
47 import static org.mockito.Mockito.verifyZeroInteractions;
48 import static org.mockito.Mockito.when;
49 
50 import com.google.common.base.Throwables;
51 import com.google.common.collect.ImmutableList;
52 import com.google.common.collect.ImmutableMap;
53 import com.google.common.collect.Iterables;
54 import com.google.common.util.concurrent.ListenableFuture;
55 import com.google.common.util.concurrent.MoreExecutors;
56 import com.google.common.util.concurrent.SettableFuture;
57 import io.grpc.Attributes;
58 import io.grpc.BinaryLog;
59 import io.grpc.CallCredentials;
60 import io.grpc.CallOptions;
61 import io.grpc.Channel;
62 import io.grpc.ClientCall;
63 import io.grpc.ClientInterceptor;
64 import io.grpc.ClientInterceptors;
65 import io.grpc.ClientStreamTracer;
66 import io.grpc.ConnectivityState;
67 import io.grpc.ConnectivityStateInfo;
68 import io.grpc.Context;
69 import io.grpc.EquivalentAddressGroup;
70 import io.grpc.IntegerMarshaller;
71 import io.grpc.InternalChannelz;
72 import io.grpc.InternalChannelz.ChannelStats;
73 import io.grpc.InternalChannelz.ChannelTrace;
74 import io.grpc.InternalInstrumented;
75 import io.grpc.LoadBalancer;
76 import io.grpc.LoadBalancer.Helper;
77 import io.grpc.LoadBalancer.PickResult;
78 import io.grpc.LoadBalancer.PickSubchannelArgs;
79 import io.grpc.LoadBalancer.Subchannel;
80 import io.grpc.LoadBalancer.SubchannelPicker;
81 import io.grpc.ManagedChannel;
82 import io.grpc.Metadata;
83 import io.grpc.MethodDescriptor;
84 import io.grpc.MethodDescriptor.MethodType;
85 import io.grpc.NameResolver;
86 import io.grpc.SecurityLevel;
87 import io.grpc.ServerMethodDefinition;
88 import io.grpc.Status;
89 import io.grpc.StringMarshaller;
90 import io.grpc.internal.ClientTransportFactory.ClientTransportOptions;
91 import io.grpc.internal.TestUtils.MockClientTransportInfo;
92 import io.grpc.stub.ClientCalls;
93 import io.grpc.testing.TestMethodDescriptors;
94 import java.io.IOException;
95 import java.net.SocketAddress;
96 import java.net.URI;
97 import java.util.ArrayList;
98 import java.util.Arrays;
99 import java.util.Collections;
100 import java.util.HashMap;
101 import java.util.LinkedList;
102 import java.util.List;
103 import java.util.Map;
104 import java.util.Random;
105 import java.util.concurrent.BlockingQueue;
106 import java.util.concurrent.ExecutionException;
107 import java.util.concurrent.Executor;
108 import java.util.concurrent.TimeUnit;
109 import java.util.concurrent.atomic.AtomicBoolean;
110 import java.util.concurrent.atomic.AtomicLong;
111 import java.util.concurrent.atomic.AtomicReference;
112 import javax.annotation.Nullable;
113 import org.junit.After;
114 import org.junit.Assert;
115 import org.junit.Assume;
116 import org.junit.Before;
117 import org.junit.Rule;
118 import org.junit.Test;
119 import org.junit.rules.ExpectedException;
120 import org.junit.runner.RunWith;
121 import org.junit.runners.JUnit4;
122 import org.mockito.ArgumentCaptor;
123 import org.mockito.Captor;
124 import org.mockito.InOrder;
125 import org.mockito.Matchers;
126 import org.mockito.Mock;
127 import org.mockito.MockitoAnnotations;
128 import org.mockito.invocation.InvocationOnMock;
129 import org.mockito.stubbing.Answer;
130 
131 /** Unit tests for {@link ManagedChannelImpl}. */
132 @RunWith(JUnit4.class)
133 public class ManagedChannelImplTest {
134   private static final Attributes NAME_RESOLVER_PARAMS =
135       Attributes.newBuilder().set(NameResolver.Factory.PARAMS_DEFAULT_PORT, 447).build();
136 
137   private static final MethodDescriptor<String, Integer> method =
138       MethodDescriptor.<String, Integer>newBuilder()
139           .setType(MethodType.UNKNOWN)
140           .setFullMethodName("service/method")
141           .setRequestMarshaller(new StringMarshaller())
142           .setResponseMarshaller(new IntegerMarshaller())
143           .build();
144   private static final Attributes.Key<String> SUBCHANNEL_ATTR_KEY =
145       Attributes.Key.create("subchannel-attr-key");
146   private static final long RECONNECT_BACKOFF_INTERVAL_NANOS = 10;
147   private static final String SERVICE_NAME = "fake.example.com";
148   private static final String AUTHORITY = SERVICE_NAME;
149   private static final String USER_AGENT = "userAgent";
150   private static final ClientTransportOptions clientTransportOptions =
151       new ClientTransportOptions()
152           .setAuthority(AUTHORITY)
153           .setUserAgent(USER_AGENT);
154   private static final String TARGET = "fake://" + SERVICE_NAME;
155   private URI expectedUri;
156   private final SocketAddress socketAddress = new SocketAddress() {};
157   private final EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(socketAddress);
158   private final FakeClock timer = new FakeClock();
159   private final FakeClock executor = new FakeClock();
160   private final FakeClock oobExecutor = new FakeClock();
161   private static final FakeClock.TaskFilter NAME_RESOLVER_REFRESH_TASK_FILTER =
162       new FakeClock.TaskFilter() {
163         @Override
164         public boolean shouldAccept(Runnable command) {
165           return command instanceof ManagedChannelImpl.NameResolverRefresh;
166         }
167       };
168 
169   private final InternalChannelz channelz = new InternalChannelz();
170 
171   @Rule public final ExpectedException thrown = ExpectedException.none();
172 
173   private ManagedChannelImpl channel;
174   private Helper helper;
175   @Captor
176   private ArgumentCaptor<Status> statusCaptor;
177   @Captor
178   private ArgumentCaptor<CallOptions> callOptionsCaptor;
179   @Mock
180   private LoadBalancer.Factory mockLoadBalancerFactory;
181   @Mock
182   private LoadBalancer mockLoadBalancer;
183 
184   @Captor
185   private ArgumentCaptor<ConnectivityStateInfo> stateInfoCaptor;
186   @Mock
187   private SubchannelPicker mockPicker;
188   @Mock
189   private ClientTransportFactory mockTransportFactory;
190   @Mock
191   private ClientCall.Listener<Integer> mockCallListener;
192   @Mock
193   private ClientCall.Listener<Integer> mockCallListener2;
194   @Mock
195   private ClientCall.Listener<Integer> mockCallListener3;
196   @Mock
197   private ClientCall.Listener<Integer> mockCallListener4;
198   @Mock
199   private ClientCall.Listener<Integer> mockCallListener5;
200   @Mock
201   private ObjectPool<Executor> executorPool;
202   @Mock
203   private ObjectPool<Executor> oobExecutorPool;
204   @Mock
205   private CallCredentials creds;
206   private ChannelBuilder channelBuilder;
207   private boolean requestConnection = true;
208   private BlockingQueue<MockClientTransportInfo> transports;
209 
210   private ArgumentCaptor<ClientStreamListener> streamListenerCaptor =
211       ArgumentCaptor.forClass(ClientStreamListener.class);
212 
createChannel(ClientInterceptor... interceptors)213   private void createChannel(ClientInterceptor... interceptors) {
214     checkState(channel == null);
215     TimeProvider fakeClockTimeProvider = new TimeProvider() {
216       @Override
217       public long currentTimeNanos() {
218         return timer.getTicker().read();
219       }
220     };
221 
222     channel = new ManagedChannelImpl(
223         channelBuilder, mockTransportFactory, new FakeBackoffPolicyProvider(),
224         oobExecutorPool, timer.getStopwatchSupplier(), Arrays.asList(interceptors),
225         fakeClockTimeProvider);
226 
227     if (requestConnection) {
228       int numExpectedTasks = 0;
229 
230       // Force-exit the initial idle-mode
231       channel.exitIdleMode();
232       if (channelBuilder.idleTimeoutMillis != ManagedChannelImpl.IDLE_TIMEOUT_MILLIS_DISABLE) {
233         numExpectedTasks += 1;
234       }
235 
236       if (getNameResolverRefresh() != null) {
237         numExpectedTasks += 1;
238       }
239 
240       assertEquals(numExpectedTasks, timer.numPendingTasks());
241 
242       ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(null);
243       verify(mockLoadBalancerFactory).newLoadBalancer(helperCaptor.capture());
244       helper = helperCaptor.getValue();
245     }
246   }
247 
248   @Before
setUp()249   public void setUp() throws Exception {
250     MockitoAnnotations.initMocks(this);
251     expectedUri = new URI(TARGET);
252     when(mockLoadBalancerFactory.newLoadBalancer(any(Helper.class))).thenReturn(mockLoadBalancer);
253     transports = TestUtils.captureTransports(mockTransportFactory);
254     when(mockTransportFactory.getScheduledExecutorService())
255         .thenReturn(timer.getScheduledExecutorService());
256     when(executorPool.getObject()).thenReturn(executor.getScheduledExecutorService());
257     when(oobExecutorPool.getObject()).thenReturn(oobExecutor.getScheduledExecutorService());
258 
259     channelBuilder = new ChannelBuilder()
260         .nameResolverFactory(new FakeNameResolverFactory.Builder(expectedUri).build())
261         .loadBalancerFactory(mockLoadBalancerFactory)
262         .userAgent(USER_AGENT)
263         .idleTimeout(AbstractManagedChannelImplBuilder.IDLE_MODE_MAX_TIMEOUT_DAYS, TimeUnit.DAYS);
264     channelBuilder.executorPool = executorPool;
265     channelBuilder.binlog = null;
266     channelBuilder.channelz = channelz;
267   }
268 
269   @After
allPendingTasksAreRun()270   public void allPendingTasksAreRun() throws Exception {
271     // The "never" verifications in the tests only hold up if all due tasks are done.
272     // As for timer, although there may be scheduled tasks in a future time, since we don't test
273     // any time-related behavior in this test suite, we only care the tasks that are due. This
274     // would ignore any time-sensitive tasks, e.g., back-off and the idle timer.
275     assertTrue(timer.getDueTasks() + " should be empty", timer.getDueTasks().isEmpty());
276     assertEquals(executor.getPendingTasks() + " should be empty", 0, executor.numPendingTasks());
277     if (channel != null) {
278       channel.shutdownNow();
279       channel = null;
280     }
281   }
282 
283   @Test
284   @SuppressWarnings("unchecked")
idleModeDisabled()285   public void idleModeDisabled() {
286     channelBuilder.nameResolverFactory(
287         new FakeNameResolverFactory.Builder(expectedUri)
288             .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
289             .build());
290     createChannel();
291 
292     // In this test suite, the channel is always created with idle mode disabled.
293     // No task is scheduled to enter idle mode
294     assertEquals(0, timer.numPendingTasks());
295     assertEquals(0, executor.numPendingTasks());
296   }
297 
298   @Test
immediateDeadlineExceeded()299   public void immediateDeadlineExceeded() {
300     createChannel();
301     ClientCall<String, Integer> call =
302         channel.newCall(method, CallOptions.DEFAULT.withDeadlineAfter(0, TimeUnit.NANOSECONDS));
303     call.start(mockCallListener, new Metadata());
304     assertEquals(1, executor.runDueTasks());
305 
306     verify(mockCallListener).onClose(statusCaptor.capture(), any(Metadata.class));
307     Status status = statusCaptor.getValue();
308     assertSame(Status.DEADLINE_EXCEEDED.getCode(), status.getCode());
309   }
310 
311   @Test
shutdownWithNoTransportsEverCreated()312   public void shutdownWithNoTransportsEverCreated() {
313     channelBuilder.nameResolverFactory(
314         new FakeNameResolverFactory.Builder(expectedUri)
315             .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
316             .build());
317     createChannel();
318     verify(executorPool).getObject();
319     verify(executorPool, never()).returnObject(anyObject());
320     verify(mockTransportFactory).getScheduledExecutorService();
321     verifyNoMoreInteractions(mockTransportFactory);
322     channel.shutdown();
323     assertTrue(channel.isShutdown());
324     assertTrue(channel.isTerminated());
325     verify(executorPool).returnObject(executor.getScheduledExecutorService());
326   }
327 
328   @Test
channelzMembership()329   public void channelzMembership() throws Exception {
330     createChannel();
331     assertNotNull(channelz.getRootChannel(channel.getLogId().getId()));
332     assertFalse(channelz.containsSubchannel(channel.getLogId()));
333     channel.shutdownNow();
334     channel.awaitTermination(5, TimeUnit.SECONDS);
335     assertNull(channelz.getRootChannel(channel.getLogId().getId()));
336     assertFalse(channelz.containsSubchannel(channel.getLogId()));
337   }
338 
339   @Test
channelzMembership_subchannel()340   public void channelzMembership_subchannel() throws Exception {
341     createChannel();
342     assertNotNull(channelz.getRootChannel(channel.getLogId().getId()));
343 
344     AbstractSubchannel subchannel =
345         (AbstractSubchannel) helper.createSubchannel(addressGroup, Attributes.EMPTY);
346     // subchannels are not root channels
347     assertNull(channelz.getRootChannel(subchannel.getInternalSubchannel().getLogId().getId()));
348     assertTrue(channelz.containsSubchannel(subchannel.getInternalSubchannel().getLogId()));
349     assertThat(getStats(channel).subchannels)
350         .containsExactly(subchannel.getInternalSubchannel());
351 
352     subchannel.requestConnection();
353     MockClientTransportInfo transportInfo = transports.poll();
354     assertNotNull(transportInfo);
355     assertTrue(channelz.containsClientSocket(transportInfo.transport.getLogId()));
356 
357     // terminate transport
358     transportInfo.listener.transportTerminated();
359     assertFalse(channelz.containsClientSocket(transportInfo.transport.getLogId()));
360 
361     // terminate subchannel
362     assertTrue(channelz.containsSubchannel(subchannel.getInternalSubchannel().getLogId()));
363     subchannel.shutdown();
364     timer.forwardTime(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
365     timer.runDueTasks();
366     assertFalse(channelz.containsSubchannel(subchannel.getInternalSubchannel().getLogId()));
367     assertThat(getStats(channel).subchannels).isEmpty();
368 
369     // channel still appears
370     assertNotNull(channelz.getRootChannel(channel.getLogId().getId()));
371   }
372 
373   @Test
channelzMembership_oob()374   public void channelzMembership_oob() throws Exception {
375     createChannel();
376     OobChannel oob = (OobChannel) helper.createOobChannel(addressGroup, AUTHORITY);
377     // oob channels are not root channels
378     assertNull(channelz.getRootChannel(oob.getLogId().getId()));
379     assertTrue(channelz.containsSubchannel(oob.getLogId()));
380     assertThat(getStats(channel).subchannels).containsExactly(oob);
381     assertTrue(channelz.containsSubchannel(oob.getLogId()));
382 
383     AbstractSubchannel subchannel = (AbstractSubchannel) oob.getSubchannel();
384     assertTrue(channelz.containsSubchannel(subchannel.getInternalSubchannel().getLogId()));
385     assertThat(getStats(oob).subchannels)
386         .containsExactly(subchannel.getInternalSubchannel());
387     assertTrue(channelz.containsSubchannel(subchannel.getInternalSubchannel().getLogId()));
388 
389     oob.getSubchannel().requestConnection();
390     MockClientTransportInfo transportInfo = transports.poll();
391     assertNotNull(transportInfo);
392     assertTrue(channelz.containsClientSocket(transportInfo.transport.getLogId()));
393 
394     // terminate transport
395     transportInfo.listener.transportTerminated();
396     assertFalse(channelz.containsClientSocket(transportInfo.transport.getLogId()));
397 
398     // terminate oobchannel
399     oob.shutdown();
400     assertFalse(channelz.containsSubchannel(oob.getLogId()));
401     assertThat(getStats(channel).subchannels).isEmpty();
402     assertFalse(channelz.containsSubchannel(subchannel.getInternalSubchannel().getLogId()));
403 
404     // channel still appears
405     assertNotNull(channelz.getRootChannel(channel.getLogId().getId()));
406   }
407 
408   @Test
callsAndShutdown()409   public void callsAndShutdown() {
410     subtestCallsAndShutdown(false, false);
411   }
412 
413   @Test
callsAndShutdownNow()414   public void callsAndShutdownNow() {
415     subtestCallsAndShutdown(true, false);
416   }
417 
418   /** Make sure shutdownNow() after shutdown() has an effect. */
419   @Test
callsAndShutdownAndShutdownNow()420   public void callsAndShutdownAndShutdownNow() {
421     subtestCallsAndShutdown(false, true);
422   }
423 
subtestCallsAndShutdown(boolean shutdownNow, boolean shutdownNowAfterShutdown)424   private void subtestCallsAndShutdown(boolean shutdownNow, boolean shutdownNowAfterShutdown) {
425     FakeNameResolverFactory nameResolverFactory =
426         new FakeNameResolverFactory.Builder(expectedUri).build();
427     channelBuilder.nameResolverFactory(nameResolverFactory);
428     createChannel();
429     verify(executorPool).getObject();
430     ClientStream mockStream = mock(ClientStream.class);
431     ClientStream mockStream2 = mock(ClientStream.class);
432     Metadata headers = new Metadata();
433     Metadata headers2 = new Metadata();
434 
435     // Configure the picker so that first RPC goes to delayed transport, and second RPC goes to
436     // real transport.
437     Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY);
438     subchannel.requestConnection();
439     verify(mockTransportFactory)
440         .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class));
441     MockClientTransportInfo transportInfo = transports.poll();
442     ConnectionClientTransport mockTransport = transportInfo.transport;
443     verify(mockTransport).start(any(ManagedClientTransport.Listener.class));
444     ManagedClientTransport.Listener transportListener = transportInfo.listener;
445     when(mockTransport.newStream(same(method), same(headers), same(CallOptions.DEFAULT)))
446         .thenReturn(mockStream);
447     when(mockTransport.newStream(same(method), same(headers2), same(CallOptions.DEFAULT)))
448         .thenReturn(mockStream2);
449     transportListener.transportReady();
450     when(mockPicker.pickSubchannel(
451         new PickSubchannelArgsImpl(method, headers, CallOptions.DEFAULT))).thenReturn(
452         PickResult.withNoResult());
453     when(mockPicker.pickSubchannel(
454         new PickSubchannelArgsImpl(method, headers2, CallOptions.DEFAULT))).thenReturn(
455         PickResult.withSubchannel(subchannel));
456     helper.updateBalancingState(READY, mockPicker);
457 
458     // First RPC, will be pending
459     ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
460     verify(mockTransportFactory)
461         .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class));
462     call.start(mockCallListener, headers);
463 
464     verify(mockTransport, never())
465         .newStream(same(method), same(headers), same(CallOptions.DEFAULT));
466 
467     // Second RPC, will be assigned to the real transport
468     ClientCall<String, Integer> call2 = channel.newCall(method, CallOptions.DEFAULT);
469     call2.start(mockCallListener2, headers2);
470     verify(mockTransport).newStream(same(method), same(headers2), same(CallOptions.DEFAULT));
471     verify(mockTransport).newStream(same(method), same(headers2), same(CallOptions.DEFAULT));
472     verify(mockStream2).start(any(ClientStreamListener.class));
473 
474     // Shutdown
475     if (shutdownNow) {
476       channel.shutdownNow();
477     } else {
478       channel.shutdown();
479       if (shutdownNowAfterShutdown) {
480         channel.shutdownNow();
481         shutdownNow = true;
482       }
483     }
484     assertTrue(channel.isShutdown());
485     assertFalse(channel.isTerminated());
486     assertEquals(1, nameResolverFactory.resolvers.size());
487     verify(mockLoadBalancerFactory).newLoadBalancer(any(Helper.class));
488 
489     // Further calls should fail without going to the transport
490     ClientCall<String, Integer> call3 = channel.newCall(method, CallOptions.DEFAULT);
491     call3.start(mockCallListener3, headers2);
492     timer.runDueTasks();
493     executor.runDueTasks();
494 
495     verify(mockCallListener3).onClose(statusCaptor.capture(), any(Metadata.class));
496     assertSame(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode());
497 
498     if (shutdownNow) {
499       // LoadBalancer and NameResolver are shut down as soon as delayed transport is terminated.
500       verify(mockLoadBalancer).shutdown();
501       assertTrue(nameResolverFactory.resolvers.get(0).shutdown);
502       // call should have been aborted by delayed transport
503       executor.runDueTasks();
504       verify(mockCallListener).onClose(same(ManagedChannelImpl.SHUTDOWN_NOW_STATUS),
505           any(Metadata.class));
506     } else {
507       // LoadBalancer and NameResolver are still running.
508       verify(mockLoadBalancer, never()).shutdown();
509       assertFalse(nameResolverFactory.resolvers.get(0).shutdown);
510       // call and call2 are still alive, and can still be assigned to a real transport
511       SubchannelPicker picker2 = mock(SubchannelPicker.class);
512       when(picker2.pickSubchannel(new PickSubchannelArgsImpl(method, headers, CallOptions.DEFAULT)))
513           .thenReturn(PickResult.withSubchannel(subchannel));
514       helper.updateBalancingState(READY, picker2);
515       executor.runDueTasks();
516       verify(mockTransport).newStream(same(method), same(headers), same(CallOptions.DEFAULT));
517       verify(mockStream).start(any(ClientStreamListener.class));
518     }
519 
520     // After call is moved out of delayed transport, LoadBalancer, NameResolver and the transports
521     // will be shutdown.
522     verify(mockLoadBalancer).shutdown();
523     assertTrue(nameResolverFactory.resolvers.get(0).shutdown);
524 
525     if (shutdownNow) {
526       // Channel shutdownNow() all subchannels after shutting down LoadBalancer
527       verify(mockTransport).shutdownNow(ManagedChannelImpl.SHUTDOWN_NOW_STATUS);
528     } else {
529       verify(mockTransport, never()).shutdownNow(any(Status.class));
530     }
531     // LoadBalancer should shutdown the subchannel
532     subchannel.shutdown();
533     if (shutdownNow) {
534       verify(mockTransport).shutdown(same(ManagedChannelImpl.SHUTDOWN_NOW_STATUS));
535     } else {
536       verify(mockTransport).shutdown(same(ManagedChannelImpl.SHUTDOWN_STATUS));
537     }
538 
539     // Killing the remaining real transport will terminate the channel
540     transportListener.transportShutdown(Status.UNAVAILABLE);
541     assertFalse(channel.isTerminated());
542     verify(executorPool, never()).returnObject(anyObject());
543     transportListener.transportTerminated();
544     assertTrue(channel.isTerminated());
545     verify(executorPool).returnObject(executor.getScheduledExecutorService());
546     verifyNoMoreInteractions(oobExecutorPool);
547 
548     verify(mockTransportFactory)
549         .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class));
550     verify(mockTransportFactory).close();
551     verify(mockTransport, atLeast(0)).getLogId();
552     verifyNoMoreInteractions(mockTransport);
553   }
554 
555   @Test
noMoreCallbackAfterLoadBalancerShutdown()556   public void noMoreCallbackAfterLoadBalancerShutdown() {
557     FakeNameResolverFactory nameResolverFactory =
558         new FakeNameResolverFactory.Builder(expectedUri)
559             .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
560             .build();
561     channelBuilder.nameResolverFactory(nameResolverFactory);
562     Status resolutionError = Status.UNAVAILABLE.withDescription("Resolution failed");
563     createChannel();
564 
565     FakeNameResolverFactory.FakeNameResolver resolver = nameResolverFactory.resolvers.get(0);
566     verify(mockLoadBalancerFactory).newLoadBalancer(any(Helper.class));
567     verify(mockLoadBalancer).handleResolvedAddressGroups(
568         eq(Arrays.asList(addressGroup)), eq(Attributes.EMPTY));
569 
570     Subchannel subchannel1 = helper.createSubchannel(addressGroup, Attributes.EMPTY);
571     Subchannel subchannel2 = helper.createSubchannel(addressGroup, Attributes.EMPTY);
572     subchannel1.requestConnection();
573     subchannel2.requestConnection();
574     verify(mockTransportFactory, times(2))
575         .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class));
576     MockClientTransportInfo transportInfo1 = transports.poll();
577     MockClientTransportInfo transportInfo2 = transports.poll();
578 
579     // LoadBalancer receives all sorts of callbacks
580     transportInfo1.listener.transportReady();
581     verify(mockLoadBalancer, times(2))
582         .handleSubchannelState(same(subchannel1), stateInfoCaptor.capture());
583     assertSame(CONNECTING, stateInfoCaptor.getAllValues().get(0).getState());
584     assertSame(READY, stateInfoCaptor.getAllValues().get(1).getState());
585 
586     verify(mockLoadBalancer)
587         .handleSubchannelState(same(subchannel2), stateInfoCaptor.capture());
588     assertSame(CONNECTING, stateInfoCaptor.getValue().getState());
589 
590     resolver.listener.onError(resolutionError);
591     verify(mockLoadBalancer).handleNameResolutionError(resolutionError);
592 
593     verifyNoMoreInteractions(mockLoadBalancer);
594 
595     channel.shutdown();
596     verify(mockLoadBalancer).shutdown();
597 
598     // No more callback should be delivered to LoadBalancer after it's shut down
599     transportInfo2.listener.transportReady();
600     resolver.listener.onError(resolutionError);
601     resolver.resolved();
602     verifyNoMoreInteractions(mockLoadBalancer);
603   }
604 
605   @Test
interceptor()606   public void interceptor() throws Exception {
607     final AtomicLong atomic = new AtomicLong();
608     ClientInterceptor interceptor = new ClientInterceptor() {
609       @Override
610       public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> interceptCall(
611           MethodDescriptor<RequestT, ResponseT> method, CallOptions callOptions,
612           Channel next) {
613         atomic.set(1);
614         return next.newCall(method, callOptions);
615       }
616     };
617     createChannel(interceptor);
618     assertNotNull(channel.newCall(method, CallOptions.DEFAULT));
619     assertEquals(1, atomic.get());
620   }
621 
622   @Test
callOptionsExecutor()623   public void callOptionsExecutor() {
624     Metadata headers = new Metadata();
625     ClientStream mockStream = mock(ClientStream.class);
626     FakeClock callExecutor = new FakeClock();
627     createChannel();
628 
629     // Start a call with a call executor
630     CallOptions options =
631         CallOptions.DEFAULT.withExecutor(callExecutor.getScheduledExecutorService());
632     ClientCall<String, Integer> call = channel.newCall(method, options);
633     call.start(mockCallListener, headers);
634 
635     // Make the transport available
636     Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY);
637     verify(mockTransportFactory, never())
638         .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class));
639     subchannel.requestConnection();
640     verify(mockTransportFactory)
641         .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class));
642     MockClientTransportInfo transportInfo = transports.poll();
643     ConnectionClientTransport mockTransport = transportInfo.transport;
644     ManagedClientTransport.Listener transportListener = transportInfo.listener;
645     when(mockTransport.newStream(same(method), same(headers), any(CallOptions.class)))
646         .thenReturn(mockStream);
647     transportListener.transportReady();
648     when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
649         .thenReturn(PickResult.withSubchannel(subchannel));
650     assertEquals(0, callExecutor.numPendingTasks());
651     helper.updateBalancingState(READY, mockPicker);
652 
653     // Real streams are started in the call executor if they were previously buffered.
654     assertEquals(1, callExecutor.runDueTasks());
655     verify(mockTransport).newStream(same(method), same(headers), same(options));
656     verify(mockStream).start(streamListenerCaptor.capture());
657 
658     // Call listener callbacks are also run in the call executor
659     ClientStreamListener streamListener = streamListenerCaptor.getValue();
660     Metadata trailers = new Metadata();
661     assertEquals(0, callExecutor.numPendingTasks());
662     streamListener.closed(Status.CANCELLED, trailers);
663     verify(mockCallListener, never()).onClose(same(Status.CANCELLED), same(trailers));
664     assertEquals(1, callExecutor.runDueTasks());
665     verify(mockCallListener).onClose(same(Status.CANCELLED), same(trailers));
666 
667 
668     transportListener.transportShutdown(Status.UNAVAILABLE);
669     transportListener.transportTerminated();
670 
671     // Clean up as much as possible to allow the channel to terminate.
672     subchannel.shutdown();
673     timer.forwardNanos(
674         TimeUnit.SECONDS.toNanos(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS));
675   }
676 
677   @Test
nameResolutionFailed()678   public void nameResolutionFailed() {
679     Status error = Status.UNAVAILABLE.withCause(new Throwable("fake name resolution error"));
680     FakeNameResolverFactory nameResolverFactory =
681         new FakeNameResolverFactory.Builder(expectedUri)
682             .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
683             .setError(error)
684             .build();
685     channelBuilder.nameResolverFactory(nameResolverFactory);
686     // Name resolution is started as soon as channel is created.
687     createChannel();
688     FakeNameResolverFactory.FakeNameResolver resolver = nameResolverFactory.resolvers.get(0);
689     verify(mockLoadBalancer).handleNameResolutionError(same(error));
690     assertEquals(1, timer.numPendingTasks(NAME_RESOLVER_REFRESH_TASK_FILTER));
691 
692     timer.forwardNanos(RECONNECT_BACKOFF_INTERVAL_NANOS - 1);
693     assertEquals(0, resolver.refreshCalled);
694 
695     timer.forwardNanos(1);
696     assertEquals(1, resolver.refreshCalled);
697     verify(mockLoadBalancer, times(2)).handleNameResolutionError(same(error));
698 
699     // Verify an additional name resolution failure does not schedule another timer
700     resolver.refresh();
701     verify(mockLoadBalancer, times(3)).handleNameResolutionError(same(error));
702     assertEquals(1, timer.numPendingTasks(NAME_RESOLVER_REFRESH_TASK_FILTER));
703 
704     // Allow the next refresh attempt to succeed
705     resolver.error = null;
706 
707     // For the second attempt, the backoff should occur at RECONNECT_BACKOFF_INTERVAL_NANOS * 2
708     timer.forwardNanos(RECONNECT_BACKOFF_INTERVAL_NANOS * 2 - 1);
709     assertEquals(2, resolver.refreshCalled);
710     timer.forwardNanos(1);
711     assertEquals(3, resolver.refreshCalled);
712     assertEquals(0, timer.numPendingTasks());
713 
714     // Verify that the successful resolution reset the backoff policy
715     resolver.listener.onError(error);
716     timer.forwardNanos(RECONNECT_BACKOFF_INTERVAL_NANOS - 1);
717     assertEquals(3, resolver.refreshCalled);
718     timer.forwardNanos(1);
719     assertEquals(4, resolver.refreshCalled);
720     assertEquals(0, timer.numPendingTasks());
721   }
722 
723   @Test
nameResolutionFailed_delayedTransportShutdownCancelsBackoff()724   public void nameResolutionFailed_delayedTransportShutdownCancelsBackoff() {
725     Status error = Status.UNAVAILABLE.withCause(new Throwable("fake name resolution error"));
726 
727     FakeNameResolverFactory nameResolverFactory =
728         new FakeNameResolverFactory.Builder(expectedUri).setError(error).build();
729     channelBuilder.nameResolverFactory(nameResolverFactory);
730     // Name resolution is started as soon as channel is created.
731     createChannel();
732     verify(mockLoadBalancer).handleNameResolutionError(same(error));
733 
734     FakeClock.ScheduledTask nameResolverBackoff = getNameResolverRefresh();
735     assertNotNull(nameResolverBackoff);
736     assertFalse(nameResolverBackoff.isCancelled());
737 
738     // Add a pending call to the delayed transport
739     ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
740     Metadata headers = new Metadata();
741     call.start(mockCallListener, headers);
742 
743     // The pending call on the delayed transport stops the name resolver backoff from cancelling
744     channel.shutdown();
745     assertFalse(nameResolverBackoff.isCancelled());
746 
747     // Notify that a subchannel is ready, which drains the delayed transport
748     SubchannelPicker picker = mock(SubchannelPicker.class);
749     Status status = Status.UNAVAILABLE.withDescription("for test");
750     when(picker.pickSubchannel(any(PickSubchannelArgs.class)))
751         .thenReturn(PickResult.withDrop(status));
752     helper.updateBalancingState(READY, picker);
753     executor.runDueTasks();
754     verify(mockCallListener).onClose(same(status), any(Metadata.class));
755 
756     assertTrue(nameResolverBackoff.isCancelled());
757   }
758 
759   @Test
nameResolverReturnsEmptySubLists()760   public void nameResolverReturnsEmptySubLists() {
761     String errorDescription = "NameResolver returned an empty list";
762 
763     // Pass a FakeNameResolverFactory with an empty list
764     createChannel();
765 
766     // LoadBalancer received the error
767     verify(mockLoadBalancerFactory).newLoadBalancer(any(Helper.class));
768     verify(mockLoadBalancer).handleNameResolutionError(statusCaptor.capture());
769     Status status = statusCaptor.getValue();
770     assertSame(Status.Code.UNAVAILABLE, status.getCode());
771     assertEquals(errorDescription, status.getDescription());
772   }
773 
774   @Test
loadBalancerThrowsInHandleResolvedAddresses()775   public void loadBalancerThrowsInHandleResolvedAddresses() {
776     RuntimeException ex = new RuntimeException("simulated");
777     // Delay the success of name resolution until allResolved() is called
778     FakeNameResolverFactory nameResolverFactory =
779         new FakeNameResolverFactory.Builder(expectedUri)
780             .setResolvedAtStart(false)
781             .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
782             .build();
783     channelBuilder.nameResolverFactory(nameResolverFactory);
784     createChannel();
785 
786     verify(mockLoadBalancerFactory).newLoadBalancer(any(Helper.class));
787     doThrow(ex).when(mockLoadBalancer).handleResolvedAddressGroups(
788         Matchers.<List<EquivalentAddressGroup>>anyObject(), any(Attributes.class));
789 
790     // NameResolver returns addresses.
791     nameResolverFactory.allResolved();
792 
793     // Exception thrown from balancer is caught by ChannelExecutor, making channel enter panic mode.
794     verifyPanicMode(ex);
795   }
796 
797   @Test
nameResolvedAfterChannelShutdown()798   public void nameResolvedAfterChannelShutdown() {
799     // Delay the success of name resolution until allResolved() is called.
800     FakeNameResolverFactory nameResolverFactory =
801         new FakeNameResolverFactory.Builder(expectedUri).setResolvedAtStart(false).build();
802     channelBuilder.nameResolverFactory(nameResolverFactory);
803     createChannel();
804 
805     channel.shutdown();
806 
807     assertTrue(channel.isShutdown());
808     assertTrue(channel.isTerminated());
809     verify(mockLoadBalancer).shutdown();
810     // Name resolved after the channel is shut down, which is possible if the name resolution takes
811     // time and is not cancellable. The resolved address will be dropped.
812     nameResolverFactory.allResolved();
813     verifyNoMoreInteractions(mockLoadBalancer);
814   }
815 
816   /**
817    * Verify that if the first resolved address points to a server that cannot be connected, the call
818    * will end up with the second address which works.
819    */
820   @Test
firstResolvedServerFailedToConnect()821   public void firstResolvedServerFailedToConnect() throws Exception {
822     final SocketAddress goodAddress = new SocketAddress() {
823         @Override public String toString() {
824           return "goodAddress";
825         }
826       };
827     final SocketAddress badAddress = new SocketAddress() {
828         @Override public String toString() {
829           return "badAddress";
830         }
831       };
832     InOrder inOrder = inOrder(mockLoadBalancer);
833 
834     List<SocketAddress> resolvedAddrs = Arrays.asList(badAddress, goodAddress);
835     FakeNameResolverFactory nameResolverFactory =
836         new FakeNameResolverFactory.Builder(expectedUri)
837             .setServers(Collections.singletonList(new EquivalentAddressGroup(resolvedAddrs)))
838             .build();
839     channelBuilder.nameResolverFactory(nameResolverFactory);
840     createChannel();
841 
842     // Start the call
843     ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
844     Metadata headers = new Metadata();
845     call.start(mockCallListener, headers);
846     executor.runDueTasks();
847 
848     // Simulate name resolution results
849     EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(resolvedAddrs);
850     inOrder.verify(mockLoadBalancer).handleResolvedAddressGroups(
851         eq(Arrays.asList(addressGroup)), eq(Attributes.EMPTY));
852     Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY);
853     when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
854         .thenReturn(PickResult.withSubchannel(subchannel));
855     subchannel.requestConnection();
856     inOrder.verify(mockLoadBalancer).handleSubchannelState(
857         same(subchannel), stateInfoCaptor.capture());
858     assertEquals(CONNECTING, stateInfoCaptor.getValue().getState());
859 
860     // The channel will starts with the first address (badAddress)
861     verify(mockTransportFactory)
862         .newClientTransport(same(badAddress), any(ClientTransportOptions.class));
863     verify(mockTransportFactory, times(0))
864         .newClientTransport(same(goodAddress), any(ClientTransportOptions.class));
865 
866     MockClientTransportInfo badTransportInfo = transports.poll();
867     // Which failed to connect
868     badTransportInfo.listener.transportShutdown(Status.UNAVAILABLE);
869     inOrder.verifyNoMoreInteractions();
870 
871     // The channel then try the second address (goodAddress)
872     verify(mockTransportFactory)
873         .newClientTransport(same(goodAddress), any(ClientTransportOptions.class));
874     MockClientTransportInfo goodTransportInfo = transports.poll();
875     when(goodTransportInfo.transport.newStream(
876             any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)))
877         .thenReturn(mock(ClientStream.class));
878 
879     goodTransportInfo.listener.transportReady();
880     inOrder.verify(mockLoadBalancer).handleSubchannelState(
881         same(subchannel), stateInfoCaptor.capture());
882     assertEquals(READY, stateInfoCaptor.getValue().getState());
883 
884     // A typical LoadBalancer will call this once the subchannel becomes READY
885     helper.updateBalancingState(READY, mockPicker);
886     // Delayed transport uses the app executor to create real streams.
887     executor.runDueTasks();
888 
889     verify(goodTransportInfo.transport).newStream(same(method), same(headers),
890         same(CallOptions.DEFAULT));
891     // The bad transport was never used.
892     verify(badTransportInfo.transport, times(0)).newStream(any(MethodDescriptor.class),
893         any(Metadata.class), any(CallOptions.class));
894   }
895 
896   @Test
failFastRpcFailFromErrorFromBalancer()897   public void failFastRpcFailFromErrorFromBalancer() {
898     subtestFailRpcFromBalancer(false, false, true);
899   }
900 
901   @Test
failFastRpcFailFromDropFromBalancer()902   public void failFastRpcFailFromDropFromBalancer() {
903     subtestFailRpcFromBalancer(false, true, true);
904   }
905 
906   @Test
waitForReadyRpcImmuneFromErrorFromBalancer()907   public void waitForReadyRpcImmuneFromErrorFromBalancer() {
908     subtestFailRpcFromBalancer(true, false, false);
909   }
910 
911   @Test
waitForReadyRpcFailFromDropFromBalancer()912   public void waitForReadyRpcFailFromDropFromBalancer() {
913     subtestFailRpcFromBalancer(true, true, true);
914   }
915 
subtestFailRpcFromBalancer(boolean waitForReady, boolean drop, boolean shouldFail)916   private void subtestFailRpcFromBalancer(boolean waitForReady, boolean drop, boolean shouldFail) {
917     createChannel();
918 
919     // This call will be buffered by the channel, thus involve delayed transport
920     CallOptions callOptions = CallOptions.DEFAULT;
921     if (waitForReady) {
922       callOptions = callOptions.withWaitForReady();
923     } else {
924       callOptions = callOptions.withoutWaitForReady();
925     }
926     ClientCall<String, Integer> call1 = channel.newCall(method, callOptions);
927     call1.start(mockCallListener, new Metadata());
928 
929     SubchannelPicker picker = mock(SubchannelPicker.class);
930     Status status = Status.UNAVAILABLE.withDescription("for test");
931 
932     when(picker.pickSubchannel(any(PickSubchannelArgs.class)))
933         .thenReturn(drop ? PickResult.withDrop(status) : PickResult.withError(status));
934     helper.updateBalancingState(READY, picker);
935 
936     executor.runDueTasks();
937     if (shouldFail) {
938       verify(mockCallListener).onClose(same(status), any(Metadata.class));
939     } else {
940       verifyZeroInteractions(mockCallListener);
941     }
942 
943     // This call doesn't involve delayed transport
944     ClientCall<String, Integer> call2 = channel.newCall(method, callOptions);
945     call2.start(mockCallListener2, new Metadata());
946 
947     executor.runDueTasks();
948     if (shouldFail) {
949       verify(mockCallListener2).onClose(same(status), any(Metadata.class));
950     } else {
951       verifyZeroInteractions(mockCallListener2);
952     }
953   }
954 
955   /**
956    * Verify that if all resolved addresses failed to connect, a fail-fast call will fail, while a
957    * wait-for-ready call will still be buffered.
958    */
959   @Test
allServersFailedToConnect()960   public void allServersFailedToConnect() throws Exception {
961     final SocketAddress addr1 = new SocketAddress() {
962         @Override public String toString() {
963           return "addr1";
964         }
965       };
966     final SocketAddress addr2 = new SocketAddress() {
967         @Override public String toString() {
968           return "addr2";
969         }
970       };
971     InOrder inOrder = inOrder(mockLoadBalancer);
972 
973     List<SocketAddress> resolvedAddrs = Arrays.asList(addr1, addr2);
974 
975     FakeNameResolverFactory nameResolverFactory =
976         new FakeNameResolverFactory.Builder(expectedUri)
977             .setServers(Collections.singletonList(new EquivalentAddressGroup(resolvedAddrs)))
978             .build();
979     channelBuilder.nameResolverFactory(nameResolverFactory);
980     createChannel();
981 
982     // Start a wait-for-ready call
983     ClientCall<String, Integer> call =
984         channel.newCall(method, CallOptions.DEFAULT.withWaitForReady());
985     Metadata headers = new Metadata();
986     call.start(mockCallListener, headers);
987     // ... and a fail-fast call
988     ClientCall<String, Integer> call2 =
989         channel.newCall(method, CallOptions.DEFAULT.withoutWaitForReady());
990     call2.start(mockCallListener2, headers);
991     executor.runDueTasks();
992 
993     // Simulate name resolution results
994     EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(resolvedAddrs);
995     inOrder.verify(mockLoadBalancer).handleResolvedAddressGroups(
996         eq(Arrays.asList(addressGroup)), eq(Attributes.EMPTY));
997     Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY);
998     when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
999         .thenReturn(PickResult.withSubchannel(subchannel));
1000     subchannel.requestConnection();
1001 
1002     inOrder.verify(mockLoadBalancer).handleSubchannelState(
1003         same(subchannel), stateInfoCaptor.capture());
1004     assertEquals(CONNECTING, stateInfoCaptor.getValue().getState());
1005 
1006     // Connecting to server1, which will fail
1007     verify(mockTransportFactory)
1008         .newClientTransport(same(addr1), any(ClientTransportOptions.class));
1009     verify(mockTransportFactory, times(0))
1010         .newClientTransport(same(addr2), any(ClientTransportOptions.class));
1011     MockClientTransportInfo transportInfo1 = transports.poll();
1012     transportInfo1.listener.transportShutdown(Status.UNAVAILABLE);
1013 
1014     // Connecting to server2, which will fail too
1015     verify(mockTransportFactory)
1016         .newClientTransport(same(addr2), any(ClientTransportOptions.class));
1017     MockClientTransportInfo transportInfo2 = transports.poll();
1018     Status server2Error = Status.UNAVAILABLE.withDescription("Server2 failed to connect");
1019     transportInfo2.listener.transportShutdown(server2Error);
1020 
1021     // ... which makes the subchannel enter TRANSIENT_FAILURE. The last error Status is propagated
1022     // to LoadBalancer.
1023     inOrder.verify(mockLoadBalancer).handleSubchannelState(
1024         same(subchannel), stateInfoCaptor.capture());
1025     assertEquals(TRANSIENT_FAILURE, stateInfoCaptor.getValue().getState());
1026     assertSame(server2Error, stateInfoCaptor.getValue().getStatus());
1027 
1028     // A typical LoadBalancer would create a picker with error
1029     SubchannelPicker picker2 = mock(SubchannelPicker.class);
1030     when(picker2.pickSubchannel(any(PickSubchannelArgs.class)))
1031         .thenReturn(PickResult.withError(server2Error));
1032     helper.updateBalancingState(TRANSIENT_FAILURE, picker2);
1033     executor.runDueTasks();
1034 
1035     // ... which fails the fail-fast call
1036     verify(mockCallListener2).onClose(same(server2Error), any(Metadata.class));
1037     // ... while the wait-for-ready call stays
1038     verifyNoMoreInteractions(mockCallListener);
1039     // No real stream was ever created
1040     verify(transportInfo1.transport, times(0))
1041         .newStream(any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class));
1042     verify(transportInfo2.transport, times(0))
1043         .newStream(any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class));
1044   }
1045 
1046   @Test
subchannels()1047   public void subchannels() {
1048     createChannel();
1049 
1050     // createSubchannel() always return a new Subchannel
1051     Attributes attrs1 = Attributes.newBuilder().set(SUBCHANNEL_ATTR_KEY, "attr1").build();
1052     Attributes attrs2 = Attributes.newBuilder().set(SUBCHANNEL_ATTR_KEY, "attr2").build();
1053     Subchannel sub1 = helper.createSubchannel(addressGroup, attrs1);
1054     Subchannel sub2 = helper.createSubchannel(addressGroup, attrs2);
1055     assertNotSame(sub1, sub2);
1056     assertNotSame(attrs1, attrs2);
1057     assertSame(attrs1, sub1.getAttributes());
1058     assertSame(attrs2, sub2.getAttributes());
1059     assertSame(addressGroup, sub1.getAddresses());
1060     assertSame(addressGroup, sub2.getAddresses());
1061 
1062     // requestConnection()
1063     verify(mockTransportFactory, never())
1064         .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class));
1065     sub1.requestConnection();
1066     verify(mockTransportFactory).newClientTransport(socketAddress, clientTransportOptions);
1067     MockClientTransportInfo transportInfo1 = transports.poll();
1068     assertNotNull(transportInfo1);
1069 
1070     sub2.requestConnection();
1071     verify(mockTransportFactory, times(2))
1072         .newClientTransport(socketAddress, clientTransportOptions);
1073     MockClientTransportInfo transportInfo2 = transports.poll();
1074     assertNotNull(transportInfo2);
1075 
1076     sub1.requestConnection();
1077     sub2.requestConnection();
1078     verify(mockTransportFactory, times(2))
1079         .newClientTransport(socketAddress, clientTransportOptions);
1080 
1081     // shutdown() has a delay
1082     sub1.shutdown();
1083     timer.forwardTime(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS - 1, TimeUnit.SECONDS);
1084     sub1.shutdown();
1085     verify(transportInfo1.transport, never()).shutdown(any(Status.class));
1086     timer.forwardTime(1, TimeUnit.SECONDS);
1087     verify(transportInfo1.transport).shutdown(same(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_STATUS));
1088 
1089     // ... but not after Channel is terminating
1090     verify(mockLoadBalancer, never()).shutdown();
1091     channel.shutdown();
1092     verify(mockLoadBalancer).shutdown();
1093     verify(transportInfo2.transport, never()).shutdown(any(Status.class));
1094 
1095     sub2.shutdown();
1096     verify(transportInfo2.transport).shutdown(same(ManagedChannelImpl.SHUTDOWN_STATUS));
1097 
1098     // Cleanup
1099     transportInfo1.listener.transportShutdown(Status.UNAVAILABLE);
1100     transportInfo1.listener.transportTerminated();
1101     transportInfo2.listener.transportShutdown(Status.UNAVAILABLE);
1102     transportInfo2.listener.transportTerminated();
1103     timer.forwardTime(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
1104   }
1105 
1106   @Test
subchannelsWhenChannelShutdownNow()1107   public void subchannelsWhenChannelShutdownNow() {
1108     createChannel();
1109     Subchannel sub1 = helper.createSubchannel(addressGroup, Attributes.EMPTY);
1110     Subchannel sub2 = helper.createSubchannel(addressGroup, Attributes.EMPTY);
1111     sub1.requestConnection();
1112     sub2.requestConnection();
1113 
1114     assertEquals(2, transports.size());
1115     MockClientTransportInfo ti1 = transports.poll();
1116     MockClientTransportInfo ti2 = transports.poll();
1117 
1118     ti1.listener.transportReady();
1119     ti2.listener.transportReady();
1120 
1121     channel.shutdownNow();
1122     verify(ti1.transport).shutdownNow(any(Status.class));
1123     verify(ti2.transport).shutdownNow(any(Status.class));
1124 
1125     ti1.listener.transportShutdown(Status.UNAVAILABLE.withDescription("shutdown now"));
1126     ti2.listener.transportShutdown(Status.UNAVAILABLE.withDescription("shutdown now"));
1127     ti1.listener.transportTerminated();
1128 
1129     assertFalse(channel.isTerminated());
1130     ti2.listener.transportTerminated();
1131     assertTrue(channel.isTerminated());
1132   }
1133 
1134   @Test
subchannelsNoConnectionShutdown()1135   public void subchannelsNoConnectionShutdown() {
1136     createChannel();
1137     Subchannel sub1 = helper.createSubchannel(addressGroup, Attributes.EMPTY);
1138     Subchannel sub2 = helper.createSubchannel(addressGroup, Attributes.EMPTY);
1139 
1140     channel.shutdown();
1141     verify(mockLoadBalancer).shutdown();
1142     sub1.shutdown();
1143     assertFalse(channel.isTerminated());
1144     sub2.shutdown();
1145     assertTrue(channel.isTerminated());
1146     verify(mockTransportFactory, never())
1147         .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class));
1148   }
1149 
1150   @Test
subchannelsNoConnectionShutdownNow()1151   public void subchannelsNoConnectionShutdownNow() {
1152     createChannel();
1153     helper.createSubchannel(addressGroup, Attributes.EMPTY);
1154     helper.createSubchannel(addressGroup, Attributes.EMPTY);
1155     channel.shutdownNow();
1156 
1157     verify(mockLoadBalancer).shutdown();
1158     // Channel's shutdownNow() will call shutdownNow() on all subchannels and oobchannels.
1159     // Therefore, channel is terminated without relying on LoadBalancer to shutdown subchannels.
1160     assertTrue(channel.isTerminated());
1161     verify(mockTransportFactory, never())
1162         .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class));
1163   }
1164 
1165   @Test
oobchannels()1166   public void oobchannels() {
1167     createChannel();
1168 
1169     ManagedChannel oob1 = helper.createOobChannel(addressGroup, "oob1authority");
1170     ManagedChannel oob2 = helper.createOobChannel(addressGroup, "oob2authority");
1171     verify(oobExecutorPool, times(2)).getObject();
1172 
1173     assertEquals("oob1authority", oob1.authority());
1174     assertEquals("oob2authority", oob2.authority());
1175 
1176     // OOB channels create connections lazily.  A new call will initiate the connection.
1177     Metadata headers = new Metadata();
1178     ClientCall<String, Integer> call = oob1.newCall(method, CallOptions.DEFAULT);
1179     call.start(mockCallListener, headers);
1180     verify(mockTransportFactory)
1181         .newClientTransport(
1182             socketAddress,
1183             new ClientTransportOptions().setAuthority("oob1authority").setUserAgent(USER_AGENT));
1184     MockClientTransportInfo transportInfo = transports.poll();
1185     assertNotNull(transportInfo);
1186 
1187     assertEquals(0, oobExecutor.numPendingTasks());
1188     transportInfo.listener.transportReady();
1189     assertEquals(1, oobExecutor.runDueTasks());
1190     verify(transportInfo.transport).newStream(same(method), same(headers),
1191         same(CallOptions.DEFAULT));
1192 
1193     // The transport goes away
1194     transportInfo.listener.transportShutdown(Status.UNAVAILABLE);
1195     transportInfo.listener.transportTerminated();
1196 
1197     // A new call will trigger a new transport
1198     ClientCall<String, Integer> call2 = oob1.newCall(method, CallOptions.DEFAULT);
1199     call2.start(mockCallListener2, headers);
1200     ClientCall<String, Integer> call3 =
1201         oob1.newCall(method, CallOptions.DEFAULT.withWaitForReady());
1202     call3.start(mockCallListener3, headers);
1203     verify(mockTransportFactory, times(2)).newClientTransport(
1204         socketAddress,
1205         new ClientTransportOptions().setAuthority("oob1authority").setUserAgent(USER_AGENT));
1206     transportInfo = transports.poll();
1207     assertNotNull(transportInfo);
1208 
1209     // This transport fails
1210     Status transportError = Status.UNAVAILABLE.withDescription("Connection refused");
1211     assertEquals(0, oobExecutor.numPendingTasks());
1212     transportInfo.listener.transportShutdown(transportError);
1213     assertTrue(oobExecutor.runDueTasks() > 0);
1214 
1215     // Fail-fast RPC will fail, while wait-for-ready RPC will still be pending
1216     verify(mockCallListener2).onClose(same(transportError), any(Metadata.class));
1217     verify(mockCallListener3, never()).onClose(any(Status.class), any(Metadata.class));
1218 
1219     // Shutdown
1220     assertFalse(oob1.isShutdown());
1221     assertFalse(oob2.isShutdown());
1222     oob1.shutdown();
1223     verify(oobExecutorPool, never()).returnObject(anyObject());
1224     oob2.shutdownNow();
1225     assertTrue(oob1.isShutdown());
1226     assertTrue(oob2.isShutdown());
1227     assertTrue(oob2.isTerminated());
1228     verify(oobExecutorPool).returnObject(oobExecutor.getScheduledExecutorService());
1229 
1230     // New RPCs will be rejected.
1231     assertEquals(0, oobExecutor.numPendingTasks());
1232     ClientCall<String, Integer> call4 = oob1.newCall(method, CallOptions.DEFAULT);
1233     ClientCall<String, Integer> call5 = oob2.newCall(method, CallOptions.DEFAULT);
1234     call4.start(mockCallListener4, headers);
1235     call5.start(mockCallListener5, headers);
1236     assertTrue(oobExecutor.runDueTasks() > 0);
1237     verify(mockCallListener4).onClose(statusCaptor.capture(), any(Metadata.class));
1238     Status status4 = statusCaptor.getValue();
1239     assertEquals(Status.Code.UNAVAILABLE, status4.getCode());
1240     verify(mockCallListener5).onClose(statusCaptor.capture(), any(Metadata.class));
1241     Status status5 = statusCaptor.getValue();
1242     assertEquals(Status.Code.UNAVAILABLE, status5.getCode());
1243 
1244     // The pending RPC will still be pending
1245     verify(mockCallListener3, never()).onClose(any(Status.class), any(Metadata.class));
1246 
1247     // This will shutdownNow() the delayed transport, terminating the pending RPC
1248     assertEquals(0, oobExecutor.numPendingTasks());
1249     oob1.shutdownNow();
1250     assertTrue(oobExecutor.runDueTasks() > 0);
1251     verify(mockCallListener3).onClose(any(Status.class), any(Metadata.class));
1252 
1253     // Shut down the channel, and it will not terminated because OOB channel has not.
1254     channel.shutdown();
1255     assertFalse(channel.isTerminated());
1256     // Delayed transport has already terminated.  Terminating the transport terminates the
1257     // subchannel, which in turn terimates the OOB channel, which terminates the channel.
1258     assertFalse(oob1.isTerminated());
1259     verify(oobExecutorPool).returnObject(oobExecutor.getScheduledExecutorService());
1260     transportInfo.listener.transportTerminated();
1261     assertTrue(oob1.isTerminated());
1262     assertTrue(channel.isTerminated());
1263     verify(oobExecutorPool, times(2)).returnObject(oobExecutor.getScheduledExecutorService());
1264   }
1265 
1266   @Test
oobChannelsWhenChannelShutdownNow()1267   public void oobChannelsWhenChannelShutdownNow() {
1268     createChannel();
1269     ManagedChannel oob1 = helper.createOobChannel(addressGroup, "oob1Authority");
1270     ManagedChannel oob2 = helper.createOobChannel(addressGroup, "oob2Authority");
1271 
1272     oob1.newCall(method, CallOptions.DEFAULT).start(mockCallListener, new Metadata());
1273     oob2.newCall(method, CallOptions.DEFAULT).start(mockCallListener2, new Metadata());
1274 
1275     assertEquals(2, transports.size());
1276     MockClientTransportInfo ti1 = transports.poll();
1277     MockClientTransportInfo ti2 = transports.poll();
1278 
1279     ti1.listener.transportReady();
1280     ti2.listener.transportReady();
1281 
1282     channel.shutdownNow();
1283     verify(ti1.transport).shutdownNow(any(Status.class));
1284     verify(ti2.transport).shutdownNow(any(Status.class));
1285 
1286     ti1.listener.transportShutdown(Status.UNAVAILABLE.withDescription("shutdown now"));
1287     ti2.listener.transportShutdown(Status.UNAVAILABLE.withDescription("shutdown now"));
1288     ti1.listener.transportTerminated();
1289 
1290     assertFalse(channel.isTerminated());
1291     ti2.listener.transportTerminated();
1292     assertTrue(channel.isTerminated());
1293   }
1294 
1295   @Test
oobChannelsNoConnectionShutdown()1296   public void oobChannelsNoConnectionShutdown() {
1297     createChannel();
1298     ManagedChannel oob1 = helper.createOobChannel(addressGroup, "oob1Authority");
1299     ManagedChannel oob2 = helper.createOobChannel(addressGroup, "oob2Authority");
1300     channel.shutdown();
1301 
1302     verify(mockLoadBalancer).shutdown();
1303     oob1.shutdown();
1304     assertTrue(oob1.isTerminated());
1305     assertFalse(channel.isTerminated());
1306     oob2.shutdown();
1307     assertTrue(oob2.isTerminated());
1308     assertTrue(channel.isTerminated());
1309     verify(mockTransportFactory, never())
1310         .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class));
1311   }
1312 
1313   @Test
oobChannelsNoConnectionShutdownNow()1314   public void oobChannelsNoConnectionShutdownNow() {
1315     createChannel();
1316     helper.createOobChannel(addressGroup, "oob1Authority");
1317     helper.createOobChannel(addressGroup, "oob2Authority");
1318     channel.shutdownNow();
1319 
1320     verify(mockLoadBalancer).shutdown();
1321     assertTrue(channel.isTerminated());
1322     // Channel's shutdownNow() will call shutdownNow() on all subchannels and oobchannels.
1323     // Therefore, channel is terminated without relying on LoadBalancer to shutdown oobchannels.
1324     verify(mockTransportFactory, never())
1325         .newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class));
1326   }
1327 
1328   @Test
refreshNameResolutionWhenSubchannelConnectionFailed()1329   public void refreshNameResolutionWhenSubchannelConnectionFailed() {
1330     subtestRefreshNameResolutionWhenConnectionFailed(false);
1331   }
1332 
1333   @Test
refreshNameResolutionWhenOobChannelConnectionFailed()1334   public void refreshNameResolutionWhenOobChannelConnectionFailed() {
1335     subtestRefreshNameResolutionWhenConnectionFailed(true);
1336   }
1337 
subtestRefreshNameResolutionWhenConnectionFailed(boolean isOobChannel)1338   private void subtestRefreshNameResolutionWhenConnectionFailed(boolean isOobChannel) {
1339     FakeNameResolverFactory nameResolverFactory =
1340         new FakeNameResolverFactory.Builder(expectedUri)
1341             .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
1342             .build();
1343     channelBuilder.nameResolverFactory(nameResolverFactory);
1344     createChannel();
1345     FakeNameResolverFactory.FakeNameResolver resolver = nameResolverFactory.resolvers.get(0);
1346 
1347     if (isOobChannel) {
1348       OobChannel oobChannel = (OobChannel) helper.createOobChannel(addressGroup, "oobAuthority");
1349       oobChannel.getSubchannel().requestConnection();
1350     } else {
1351       Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY);
1352       subchannel.requestConnection();
1353     }
1354 
1355     MockClientTransportInfo transportInfo = transports.poll();
1356     assertNotNull(transportInfo);
1357 
1358     // Transport closed when connecting
1359     assertEquals(0, resolver.refreshCalled);
1360     transportInfo.listener.transportShutdown(Status.UNAVAILABLE);
1361     assertEquals(1, resolver.refreshCalled);
1362 
1363     timer.forwardNanos(RECONNECT_BACKOFF_INTERVAL_NANOS);
1364     transportInfo = transports.poll();
1365     assertNotNull(transportInfo);
1366 
1367     transportInfo.listener.transportReady();
1368 
1369     // Transport closed when ready
1370     assertEquals(1, resolver.refreshCalled);
1371     transportInfo.listener.transportShutdown(Status.UNAVAILABLE);
1372     assertEquals(2, resolver.refreshCalled);
1373   }
1374 
1375   @Test
uriPattern()1376   public void uriPattern() {
1377     assertTrue(ManagedChannelImpl.URI_PATTERN.matcher("a:/").matches());
1378     assertTrue(ManagedChannelImpl.URI_PATTERN.matcher("Z019+-.:/!@ #~ ").matches());
1379     assertFalse(ManagedChannelImpl.URI_PATTERN.matcher("a/:").matches()); // "/:" not matched
1380     assertFalse(ManagedChannelImpl.URI_PATTERN.matcher("0a:/").matches()); // '0' not matched
1381     assertFalse(ManagedChannelImpl.URI_PATTERN.matcher("a,:/").matches()); // ',' not matched
1382     assertFalse(ManagedChannelImpl.URI_PATTERN.matcher(" a:/").matches()); // space not matched
1383   }
1384 
1385   /**
1386    * Test that information such as the Call's context, MethodDescriptor, authority, executor are
1387    * propagated to newStream() and applyRequestMetadata().
1388    */
1389   @Test
1390   @SuppressWarnings("deprecation")
informationPropagatedToNewStreamAndCallCredentials()1391   public void informationPropagatedToNewStreamAndCallCredentials() {
1392     createChannel();
1393     CallOptions callOptions = CallOptions.DEFAULT.withCallCredentials(creds);
1394     final Context.Key<String> testKey = Context.key("testing");
1395     Context ctx = Context.current().withValue(testKey, "testValue");
1396     final LinkedList<Context> credsApplyContexts = new LinkedList<Context>();
1397     final LinkedList<Context> newStreamContexts = new LinkedList<Context>();
1398     doAnswer(new Answer<Void>() {
1399         @Override
1400         public Void answer(InvocationOnMock in) throws Throwable {
1401           credsApplyContexts.add(Context.current());
1402           return null;
1403         }
1404       }).when(creds).applyRequestMetadata(  // TODO(zhangkun83): remove suppression of deprecations
1405           any(MethodDescriptor.class), any(Attributes.class), any(Executor.class),
1406           any(CallCredentials.MetadataApplier.class));
1407 
1408     // First call will be on delayed transport.  Only newCall() is run within the expected context,
1409     // so that we can verify that the context is explicitly attached before calling newStream() and
1410     // applyRequestMetadata(), which happens after we detach the context from the thread.
1411     Context origCtx = ctx.attach();
1412     assertEquals("testValue", testKey.get());
1413     ClientCall<String, Integer> call = channel.newCall(method, callOptions);
1414     ctx.detach(origCtx);
1415     assertNull(testKey.get());
1416     call.start(mockCallListener, new Metadata());
1417 
1418     // Simulate name resolution results
1419     EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(socketAddress);
1420     Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY);
1421     subchannel.requestConnection();
1422     verify(mockTransportFactory)
1423         .newClientTransport(same(socketAddress), eq(clientTransportOptions));
1424     MockClientTransportInfo transportInfo = transports.poll();
1425     final ConnectionClientTransport transport = transportInfo.transport;
1426     when(transport.getAttributes()).thenReturn(Attributes.EMPTY);
1427     doAnswer(new Answer<ClientStream>() {
1428         @Override
1429         public ClientStream answer(InvocationOnMock in) throws Throwable {
1430           newStreamContexts.add(Context.current());
1431           return mock(ClientStream.class);
1432         }
1433       }).when(transport).newStream(
1434           any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class));
1435 
1436     verify(creds, never()).applyRequestMetadata(
1437         any(MethodDescriptor.class), any(Attributes.class), any(Executor.class),
1438         any(CallCredentials.MetadataApplier.class));
1439 
1440     // applyRequestMetadata() is called after the transport becomes ready.
1441     transportInfo.listener.transportReady();
1442     when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
1443         .thenReturn(PickResult.withSubchannel(subchannel));
1444     helper.updateBalancingState(READY, mockPicker);
1445     executor.runDueTasks();
1446     ArgumentCaptor<Attributes> attrsCaptor = ArgumentCaptor.forClass(Attributes.class);
1447     ArgumentCaptor<CallCredentials.MetadataApplier> applierCaptor
1448         = ArgumentCaptor.forClass(CallCredentials.MetadataApplier.class);
1449     verify(creds).applyRequestMetadata(same(method), attrsCaptor.capture(),
1450         same(executor.getScheduledExecutorService()), applierCaptor.capture());
1451     assertEquals("testValue", testKey.get(credsApplyContexts.poll()));
1452     assertEquals(AUTHORITY, attrsCaptor.getValue().get(CallCredentials.ATTR_AUTHORITY));
1453     assertEquals(SecurityLevel.NONE,
1454         attrsCaptor.getValue().get(CallCredentials.ATTR_SECURITY_LEVEL));
1455     verify(transport, never()).newStream(
1456         any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class));
1457 
1458     // newStream() is called after apply() is called
1459     applierCaptor.getValue().apply(new Metadata());
1460     verify(transport).newStream(same(method), any(Metadata.class), same(callOptions));
1461     assertEquals("testValue", testKey.get(newStreamContexts.poll()));
1462     // The context should not live beyond the scope of newStream() and applyRequestMetadata()
1463     assertNull(testKey.get());
1464 
1465 
1466     // Second call will not be on delayed transport
1467     origCtx = ctx.attach();
1468     call = channel.newCall(method, callOptions);
1469     ctx.detach(origCtx);
1470     call.start(mockCallListener, new Metadata());
1471 
1472     verify(creds, times(2)).applyRequestMetadata(same(method), attrsCaptor.capture(),
1473         same(executor.getScheduledExecutorService()), applierCaptor.capture());
1474     assertEquals("testValue", testKey.get(credsApplyContexts.poll()));
1475     assertEquals(AUTHORITY, attrsCaptor.getValue().get(CallCredentials.ATTR_AUTHORITY));
1476     assertEquals(SecurityLevel.NONE,
1477         attrsCaptor.getValue().get(CallCredentials.ATTR_SECURITY_LEVEL));
1478     // This is from the first call
1479     verify(transport).newStream(
1480         any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class));
1481 
1482     // Still, newStream() is called after apply() is called
1483     applierCaptor.getValue().apply(new Metadata());
1484     verify(transport, times(2)).newStream(same(method), any(Metadata.class), same(callOptions));
1485     assertEquals("testValue", testKey.get(newStreamContexts.poll()));
1486 
1487     assertNull(testKey.get());
1488   }
1489 
1490   @Test
pickerReturnsStreamTracer_noDelay()1491   public void pickerReturnsStreamTracer_noDelay() {
1492     ClientStream mockStream = mock(ClientStream.class);
1493     ClientStreamTracer.Factory factory1 = mock(ClientStreamTracer.Factory.class);
1494     ClientStreamTracer.Factory factory2 = mock(ClientStreamTracer.Factory.class);
1495     createChannel();
1496     Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY);
1497     subchannel.requestConnection();
1498     MockClientTransportInfo transportInfo = transports.poll();
1499     transportInfo.listener.transportReady();
1500     ClientTransport mockTransport = transportInfo.transport;
1501     when(mockTransport.newStream(
1502             any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)))
1503         .thenReturn(mockStream);
1504 
1505     when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn(
1506         PickResult.withSubchannel(subchannel, factory2));
1507     helper.updateBalancingState(READY, mockPicker);
1508 
1509     CallOptions callOptions = CallOptions.DEFAULT.withStreamTracerFactory(factory1);
1510     ClientCall<String, Integer> call = channel.newCall(method, callOptions);
1511     call.start(mockCallListener, new Metadata());
1512 
1513     verify(mockPicker).pickSubchannel(any(PickSubchannelArgs.class));
1514     verify(mockTransport).newStream(same(method), any(Metadata.class), callOptionsCaptor.capture());
1515     assertEquals(
1516         Arrays.asList(factory1, factory2),
1517         callOptionsCaptor.getValue().getStreamTracerFactories());
1518     // The factories are safely not stubbed because we do not expect any usage of them.
1519     verifyZeroInteractions(factory1);
1520     verifyZeroInteractions(factory2);
1521   }
1522 
1523   @Test
pickerReturnsStreamTracer_delayed()1524   public void pickerReturnsStreamTracer_delayed() {
1525     ClientStream mockStream = mock(ClientStream.class);
1526     ClientStreamTracer.Factory factory1 = mock(ClientStreamTracer.Factory.class);
1527     ClientStreamTracer.Factory factory2 = mock(ClientStreamTracer.Factory.class);
1528     createChannel();
1529 
1530     CallOptions callOptions = CallOptions.DEFAULT.withStreamTracerFactory(factory1);
1531     ClientCall<String, Integer> call = channel.newCall(method, callOptions);
1532     call.start(mockCallListener, new Metadata());
1533 
1534     Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY);
1535     subchannel.requestConnection();
1536     MockClientTransportInfo transportInfo = transports.poll();
1537     transportInfo.listener.transportReady();
1538     ClientTransport mockTransport = transportInfo.transport;
1539     when(mockTransport.newStream(
1540             any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)))
1541         .thenReturn(mockStream);
1542     when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn(
1543         PickResult.withSubchannel(subchannel, factory2));
1544 
1545     helper.updateBalancingState(READY, mockPicker);
1546     assertEquals(1, executor.runDueTasks());
1547 
1548     verify(mockPicker).pickSubchannel(any(PickSubchannelArgs.class));
1549     verify(mockTransport).newStream(same(method), any(Metadata.class), callOptionsCaptor.capture());
1550     assertEquals(
1551         Arrays.asList(factory1, factory2),
1552         callOptionsCaptor.getValue().getStreamTracerFactories());
1553     // The factories are safely not stubbed because we do not expect any usage of them.
1554     verifyZeroInteractions(factory1);
1555     verifyZeroInteractions(factory2);
1556   }
1557 
1558   @Test
getState_loadBalancerSupportsChannelState()1559   public void getState_loadBalancerSupportsChannelState() {
1560     channelBuilder.nameResolverFactory(
1561         new FakeNameResolverFactory.Builder(expectedUri).setResolvedAtStart(false).build());
1562     createChannel();
1563     assertEquals(IDLE, channel.getState(false));
1564 
1565     helper.updateBalancingState(TRANSIENT_FAILURE, mockPicker);
1566     assertEquals(TRANSIENT_FAILURE, channel.getState(false));
1567   }
1568 
1569   @Test
getState_withRequestConnect()1570   public void getState_withRequestConnect() {
1571     channelBuilder.nameResolverFactory(
1572         new FakeNameResolverFactory.Builder(expectedUri).setResolvedAtStart(false).build());
1573     requestConnection = false;
1574     createChannel();
1575 
1576     assertEquals(IDLE, channel.getState(false));
1577     verify(mockLoadBalancerFactory, never()).newLoadBalancer(any(Helper.class));
1578 
1579     // call getState() with requestConnection = true
1580     assertEquals(IDLE, channel.getState(true));
1581     ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(null);
1582     verify(mockLoadBalancerFactory).newLoadBalancer(helperCaptor.capture());
1583     helper = helperCaptor.getValue();
1584 
1585     helper.updateBalancingState(CONNECTING, mockPicker);
1586     assertEquals(CONNECTING, channel.getState(false));
1587     assertEquals(CONNECTING, channel.getState(true));
1588     verifyNoMoreInteractions(mockLoadBalancerFactory);
1589   }
1590 
1591   @Test
getState_withRequestConnect_IdleWithLbRunning()1592   public void getState_withRequestConnect_IdleWithLbRunning() {
1593     channelBuilder.nameResolverFactory(
1594         new FakeNameResolverFactory.Builder(expectedUri).setResolvedAtStart(false).build());
1595     createChannel();
1596     verify(mockLoadBalancerFactory).newLoadBalancer(any(Helper.class));
1597 
1598     helper.updateBalancingState(IDLE, mockPicker);
1599 
1600     assertEquals(IDLE, channel.getState(true));
1601     verifyNoMoreInteractions(mockLoadBalancerFactory);
1602     verify(mockPicker).requestConnection();
1603   }
1604 
1605   @Test
notifyWhenStateChanged()1606   public void notifyWhenStateChanged() {
1607     final AtomicBoolean stateChanged = new AtomicBoolean();
1608     Runnable onStateChanged = new Runnable() {
1609       @Override
1610       public void run() {
1611         stateChanged.set(true);
1612       }
1613     };
1614 
1615     channelBuilder.nameResolverFactory(
1616         new FakeNameResolverFactory.Builder(expectedUri).setResolvedAtStart(false).build());
1617     createChannel();
1618     assertEquals(IDLE, channel.getState(false));
1619 
1620     channel.notifyWhenStateChanged(IDLE, onStateChanged);
1621     executor.runDueTasks();
1622     assertFalse(stateChanged.get());
1623 
1624     // state change from IDLE to CONNECTING
1625     helper.updateBalancingState(CONNECTING, mockPicker);
1626     // onStateChanged callback should run
1627     executor.runDueTasks();
1628     assertTrue(stateChanged.get());
1629 
1630     // clear and test form CONNECTING
1631     stateChanged.set(false);
1632     channel.notifyWhenStateChanged(IDLE, onStateChanged);
1633     // onStateChanged callback should run immediately
1634     executor.runDueTasks();
1635     assertTrue(stateChanged.get());
1636   }
1637 
1638   @Test
channelStateWhenChannelShutdown()1639   public void channelStateWhenChannelShutdown() {
1640     final AtomicBoolean stateChanged = new AtomicBoolean();
1641     Runnable onStateChanged = new Runnable() {
1642       @Override
1643       public void run() {
1644         stateChanged.set(true);
1645       }
1646     };
1647 
1648     channelBuilder.nameResolverFactory(
1649         new FakeNameResolverFactory.Builder(expectedUri).setResolvedAtStart(false).build());
1650     createChannel();
1651     assertEquals(IDLE, channel.getState(false));
1652     channel.notifyWhenStateChanged(IDLE, onStateChanged);
1653     executor.runDueTasks();
1654     assertFalse(stateChanged.get());
1655 
1656     channel.shutdown();
1657     assertEquals(SHUTDOWN, channel.getState(false));
1658     executor.runDueTasks();
1659     assertTrue(stateChanged.get());
1660 
1661     stateChanged.set(false);
1662     channel.notifyWhenStateChanged(SHUTDOWN, onStateChanged);
1663     helper.updateBalancingState(CONNECTING, mockPicker);
1664 
1665     assertEquals(SHUTDOWN, channel.getState(false));
1666     executor.runDueTasks();
1667     assertFalse(stateChanged.get());
1668   }
1669 
1670   @Test
stateIsIdleOnIdleTimeout()1671   public void stateIsIdleOnIdleTimeout() {
1672     long idleTimeoutMillis = 2000L;
1673     channelBuilder.idleTimeout(idleTimeoutMillis, TimeUnit.MILLISECONDS);
1674     createChannel();
1675     assertEquals(IDLE, channel.getState(false));
1676 
1677     helper.updateBalancingState(CONNECTING, mockPicker);
1678     assertEquals(CONNECTING, channel.getState(false));
1679 
1680     timer.forwardNanos(TimeUnit.MILLISECONDS.toNanos(idleTimeoutMillis));
1681     assertEquals(IDLE, channel.getState(false));
1682   }
1683 
1684   @Test
panic_whenIdle()1685   public void panic_whenIdle() {
1686     subtestPanic(IDLE);
1687   }
1688 
1689   @Test
panic_whenConnecting()1690   public void panic_whenConnecting() {
1691     subtestPanic(CONNECTING);
1692   }
1693 
1694   @Test
panic_whenTransientFailure()1695   public void panic_whenTransientFailure() {
1696     subtestPanic(TRANSIENT_FAILURE);
1697   }
1698 
1699   @Test
panic_whenReady()1700   public void panic_whenReady() {
1701     subtestPanic(READY);
1702   }
1703 
subtestPanic(ConnectivityState initialState)1704   private void subtestPanic(ConnectivityState initialState) {
1705     assertNotEquals("We don't test panic mode if it's already SHUTDOWN", SHUTDOWN, initialState);
1706     long idleTimeoutMillis = 2000L;
1707     FakeNameResolverFactory nameResolverFactory =
1708         new FakeNameResolverFactory.Builder(expectedUri).build();
1709     channelBuilder.nameResolverFactory(nameResolverFactory);
1710     channelBuilder.idleTimeout(idleTimeoutMillis, TimeUnit.MILLISECONDS);
1711     createChannel();
1712 
1713     verify(mockLoadBalancerFactory).newLoadBalancer(any(Helper.class));
1714     assertEquals(1, nameResolverFactory.resolvers.size());
1715     FakeNameResolverFactory.FakeNameResolver resolver = nameResolverFactory.resolvers.remove(0);
1716 
1717     Throwable panicReason = new Exception("Simulated uncaught exception");
1718     if (initialState == IDLE) {
1719       timer.forwardNanos(TimeUnit.MILLISECONDS.toNanos(idleTimeoutMillis));
1720     } else {
1721       helper.updateBalancingState(initialState, mockPicker);
1722     }
1723     assertEquals(initialState, channel.getState(false));
1724 
1725     if (initialState == IDLE) {
1726       // IDLE mode will shutdown resolver and balancer
1727       verify(mockLoadBalancer).shutdown();
1728       assertTrue(resolver.shutdown);
1729       // A new resolver is created
1730       assertEquals(1, nameResolverFactory.resolvers.size());
1731       resolver = nameResolverFactory.resolvers.remove(0);
1732       assertFalse(resolver.shutdown);
1733     } else {
1734       verify(mockLoadBalancer, never()).shutdown();
1735       assertFalse(resolver.shutdown);
1736     }
1737 
1738     // Make channel panic!
1739     channel.panic(panicReason);
1740 
1741     // Calls buffered in delayedTransport will fail
1742 
1743     // Resolver and balancer are shutdown
1744     verify(mockLoadBalancer).shutdown();
1745     assertTrue(resolver.shutdown);
1746 
1747     // Channel will stay in TRANSIENT_FAILURE. getState(true) will not revive it.
1748     assertEquals(TRANSIENT_FAILURE, channel.getState(true));
1749     assertEquals(TRANSIENT_FAILURE, channel.getState(true));
1750     verifyPanicMode(panicReason);
1751 
1752     // No new resolver or balancer are created
1753     verifyNoMoreInteractions(mockLoadBalancerFactory);
1754     assertEquals(0, nameResolverFactory.resolvers.size());
1755 
1756     // A misbehaving balancer that calls updateBalancingState() after it's shut down will not be
1757     // able to revive it.
1758     helper.updateBalancingState(READY, mockPicker);
1759     verifyPanicMode(panicReason);
1760 
1761     // Cannot be revived by exitIdleMode()
1762     channel.exitIdleMode();
1763     verifyPanicMode(panicReason);
1764 
1765     // Can still shutdown normally
1766     channel.shutdown();
1767     assertTrue(channel.isShutdown());
1768     assertTrue(channel.isTerminated());
1769     assertEquals(SHUTDOWN, channel.getState(false));
1770 
1771     // We didn't stub mockPicker, because it should have never been called in this test.
1772     verifyZeroInteractions(mockPicker);
1773   }
1774 
1775   @Test
panic_bufferedCallsWillFail()1776   public void panic_bufferedCallsWillFail() {
1777     createChannel();
1778 
1779     when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
1780         .thenReturn(PickResult.withNoResult());
1781     helper.updateBalancingState(CONNECTING, mockPicker);
1782 
1783     // Start RPCs that will be buffered in delayedTransport
1784     ClientCall<String, Integer> call =
1785         channel.newCall(method, CallOptions.DEFAULT.withoutWaitForReady());
1786     call.start(mockCallListener, new Metadata());
1787 
1788     ClientCall<String, Integer> call2 =
1789         channel.newCall(method, CallOptions.DEFAULT.withWaitForReady());
1790     call2.start(mockCallListener2, new Metadata());
1791 
1792     executor.runDueTasks();
1793     verifyZeroInteractions(mockCallListener, mockCallListener2);
1794 
1795     // Enter panic
1796     Throwable panicReason = new Exception("Simulated uncaught exception");
1797     channel.panic(panicReason);
1798 
1799     // Buffered RPCs fail immediately
1800     executor.runDueTasks();
1801     verifyCallListenerClosed(mockCallListener, Status.Code.INTERNAL, panicReason);
1802     verifyCallListenerClosed(mockCallListener2, Status.Code.INTERNAL, panicReason);
1803   }
1804 
verifyPanicMode(Throwable cause)1805   private void verifyPanicMode(Throwable cause) {
1806     Assume.assumeTrue("Panic mode disabled to resolve issues with some tests. See #3293", false);
1807 
1808     @SuppressWarnings("unchecked")
1809     ClientCall.Listener<Integer> mockListener =
1810         (ClientCall.Listener<Integer>) mock(ClientCall.Listener.class);
1811     assertEquals(TRANSIENT_FAILURE, channel.getState(false));
1812     ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
1813     call.start(mockListener, new Metadata());
1814     executor.runDueTasks();
1815     verifyCallListenerClosed(mockListener, Status.Code.INTERNAL, cause);
1816 
1817     // Channel is dead.  No more pending task to possibly revive it.
1818     assertEquals(0, timer.numPendingTasks());
1819     assertEquals(0, executor.numPendingTasks());
1820     assertEquals(0, oobExecutor.numPendingTasks());
1821   }
1822 
verifyCallListenerClosed( ClientCall.Listener<Integer> listener, Status.Code code, Throwable cause)1823   private void verifyCallListenerClosed(
1824       ClientCall.Listener<Integer> listener, Status.Code code, Throwable cause) {
1825     ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(null);
1826     verify(listener).onClose(captor.capture(), any(Metadata.class));
1827     Status rpcStatus = captor.getValue();
1828     assertEquals(code, rpcStatus.getCode());
1829     assertSame(cause, rpcStatus.getCause());
1830     verifyNoMoreInteractions(listener);
1831   }
1832 
1833   @Test
idleTimeoutAndReconnect()1834   public void idleTimeoutAndReconnect() {
1835     long idleTimeoutMillis = 2000L;
1836     channelBuilder.idleTimeout(idleTimeoutMillis, TimeUnit.MILLISECONDS);
1837     createChannel();
1838 
1839     timer.forwardNanos(TimeUnit.MILLISECONDS.toNanos(idleTimeoutMillis));
1840     assertEquals(IDLE, channel.getState(true /* request connection */));
1841 
1842     ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(Helper.class);
1843     // Two times of requesting connection will create loadBalancer twice.
1844     verify(mockLoadBalancerFactory, times(2)).newLoadBalancer(helperCaptor.capture());
1845     Helper helper2 = helperCaptor.getValue();
1846 
1847     // Updating on the old helper (whose balancer has been shutdown) does not change the channel
1848     // state.
1849     helper.updateBalancingState(CONNECTING, mockPicker);
1850     assertEquals(IDLE, channel.getState(false));
1851 
1852     helper2.updateBalancingState(CONNECTING, mockPicker);
1853     assertEquals(CONNECTING, channel.getState(false));
1854   }
1855 
1856   @Test
idleMode_resetsDelayedTransportPicker()1857   public void idleMode_resetsDelayedTransportPicker() {
1858     ClientStream mockStream = mock(ClientStream.class);
1859     Status pickError = Status.UNAVAILABLE.withDescription("pick result error");
1860     long idleTimeoutMillis = 1000L;
1861     channelBuilder.idleTimeout(idleTimeoutMillis, TimeUnit.MILLISECONDS);
1862     channelBuilder.nameResolverFactory(
1863         new FakeNameResolverFactory.Builder(expectedUri)
1864             .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
1865             .build());
1866     createChannel();
1867     assertEquals(IDLE, channel.getState(false));
1868 
1869     // This call will be buffered in delayedTransport
1870     ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
1871     call.start(mockCallListener, new Metadata());
1872 
1873     // Move channel into TRANSIENT_FAILURE, which will fail the pending call
1874     when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
1875         .thenReturn(PickResult.withError(pickError));
1876     helper.updateBalancingState(TRANSIENT_FAILURE, mockPicker);
1877     assertEquals(TRANSIENT_FAILURE, channel.getState(false));
1878     executor.runDueTasks();
1879     verify(mockCallListener).onClose(same(pickError), any(Metadata.class));
1880 
1881     // Move channel to idle
1882     timer.forwardNanos(TimeUnit.MILLISECONDS.toNanos(idleTimeoutMillis));
1883     assertEquals(IDLE, channel.getState(false));
1884 
1885     // This call should be buffered, but will move the channel out of idle
1886     ClientCall<String, Integer> call2 = channel.newCall(method, CallOptions.DEFAULT);
1887     call2.start(mockCallListener2, new Metadata());
1888     executor.runDueTasks();
1889     verifyNoMoreInteractions(mockCallListener2);
1890 
1891     // Get the helper created on exiting idle
1892     ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(Helper.class);
1893     verify(mockLoadBalancerFactory, times(2)).newLoadBalancer(helperCaptor.capture());
1894     Helper helper2 = helperCaptor.getValue();
1895 
1896     // Establish a connection
1897     Subchannel subchannel = helper2.createSubchannel(addressGroup, Attributes.EMPTY);
1898     subchannel.requestConnection();
1899     MockClientTransportInfo transportInfo = transports.poll();
1900     ConnectionClientTransport mockTransport = transportInfo.transport;
1901     ManagedClientTransport.Listener transportListener = transportInfo.listener;
1902     when(mockTransport.newStream(same(method), any(Metadata.class), any(CallOptions.class)))
1903         .thenReturn(mockStream);
1904     transportListener.transportReady();
1905 
1906     when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
1907         .thenReturn(PickResult.withSubchannel(subchannel));
1908     helper2.updateBalancingState(READY, mockPicker);
1909     assertEquals(READY, channel.getState(false));
1910     executor.runDueTasks();
1911 
1912     // Verify the buffered call was drained
1913     verify(mockTransport).newStream(same(method), any(Metadata.class), any(CallOptions.class));
1914     verify(mockStream).start(any(ClientStreamListener.class));
1915   }
1916 
1917   @Test
enterIdleEntersIdle()1918   public void enterIdleEntersIdle() {
1919     createChannel();
1920     helper.updateBalancingState(READY, mockPicker);
1921     assertEquals(READY, channel.getState(false));
1922 
1923     channel.enterIdle();
1924 
1925     assertEquals(IDLE, channel.getState(false));
1926   }
1927 
1928   @Test
enterIdleAfterIdleTimerIsNoOp()1929   public void enterIdleAfterIdleTimerIsNoOp() {
1930     long idleTimeoutMillis = 2000L;
1931     channelBuilder.idleTimeout(idleTimeoutMillis, TimeUnit.MILLISECONDS);
1932     createChannel();
1933     timer.forwardNanos(TimeUnit.MILLISECONDS.toNanos(idleTimeoutMillis));
1934     assertEquals(IDLE, channel.getState(false));
1935 
1936     channel.enterIdle();
1937 
1938     assertEquals(IDLE, channel.getState(false));
1939   }
1940 
1941   @Test
enterIdle_exitsIdleIfDelayedStreamPending()1942   public void enterIdle_exitsIdleIfDelayedStreamPending() {
1943     FakeNameResolverFactory nameResolverFactory =
1944         new FakeNameResolverFactory.Builder(expectedUri)
1945             .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
1946             .build();
1947     channelBuilder.nameResolverFactory(nameResolverFactory);
1948     createChannel();
1949 
1950     // Start a call that will be buffered in delayedTransport
1951     ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
1952     call.start(mockCallListener, new Metadata());
1953 
1954     // enterIdle() will shut down the name resolver and lb policy used to get a pick for the delayed
1955     // call
1956     channel.enterIdle();
1957     assertEquals(IDLE, channel.getState(false));
1958 
1959     // enterIdle() will restart the delayed call by exiting idle. This creates a new helper.
1960     ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(Helper.class);
1961     verify(mockLoadBalancerFactory, times(2)).newLoadBalancer(helperCaptor.capture());
1962     Helper helper2 = helperCaptor.getValue();
1963 
1964     // Establish a connection
1965     Subchannel subchannel = helper2.createSubchannel(addressGroup, Attributes.EMPTY);
1966     subchannel.requestConnection();
1967     ClientStream mockStream = mock(ClientStream.class);
1968     MockClientTransportInfo transportInfo = transports.poll();
1969     ConnectionClientTransport mockTransport = transportInfo.transport;
1970     ManagedClientTransport.Listener transportListener = transportInfo.listener;
1971     when(mockTransport.newStream(same(method), any(Metadata.class), any(CallOptions.class)))
1972         .thenReturn(mockStream);
1973     transportListener.transportReady();
1974     when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
1975         .thenReturn(PickResult.withSubchannel(subchannel));
1976     helper2.updateBalancingState(READY, mockPicker);
1977     assertEquals(READY, channel.getState(false));
1978 
1979     // Verify the original call was drained
1980     executor.runDueTasks();
1981     verify(mockTransport).newStream(same(method), any(Metadata.class), any(CallOptions.class));
1982     verify(mockStream).start(any(ClientStreamListener.class));
1983   }
1984 
1985   @Test
updateBalancingStateDoesUpdatePicker()1986   public void updateBalancingStateDoesUpdatePicker() {
1987     ClientStream mockStream = mock(ClientStream.class);
1988     createChannel();
1989 
1990     ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
1991     call.start(mockCallListener, new Metadata());
1992 
1993     // Make the transport available with subchannel2
1994     Subchannel subchannel1 = helper.createSubchannel(addressGroup, Attributes.EMPTY);
1995     Subchannel subchannel2 = helper.createSubchannel(addressGroup, Attributes.EMPTY);
1996     subchannel2.requestConnection();
1997 
1998     MockClientTransportInfo transportInfo = transports.poll();
1999     ConnectionClientTransport mockTransport = transportInfo.transport;
2000     ManagedClientTransport.Listener transportListener = transportInfo.listener;
2001     when(mockTransport.newStream(same(method), any(Metadata.class), any(CallOptions.class)))
2002         .thenReturn(mockStream);
2003     transportListener.transportReady();
2004 
2005     when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
2006         .thenReturn(PickResult.withSubchannel(subchannel1));
2007     helper.updateBalancingState(READY, mockPicker);
2008 
2009     executor.runDueTasks();
2010     verify(mockTransport, never())
2011         .newStream(any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class));
2012     verify(mockStream, never()).start(any(ClientStreamListener.class));
2013 
2014 
2015     when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
2016         .thenReturn(PickResult.withSubchannel(subchannel2));
2017     helper.updateBalancingState(READY, mockPicker);
2018 
2019     executor.runDueTasks();
2020     verify(mockTransport).newStream(same(method), any(Metadata.class), any(CallOptions.class));
2021     verify(mockStream).start(any(ClientStreamListener.class));
2022   }
2023 
2024   @Test
updateBalancingStateWithShutdownShouldBeIgnored()2025   public void updateBalancingStateWithShutdownShouldBeIgnored() {
2026     channelBuilder.nameResolverFactory(
2027         new FakeNameResolverFactory.Builder(expectedUri).setResolvedAtStart(false).build());
2028     createChannel();
2029     assertEquals(IDLE, channel.getState(false));
2030 
2031     Runnable onStateChanged = mock(Runnable.class);
2032     channel.notifyWhenStateChanged(IDLE, onStateChanged);
2033 
2034     helper.updateBalancingState(SHUTDOWN, mockPicker);
2035 
2036     assertEquals(IDLE, channel.getState(false));
2037     executor.runDueTasks();
2038     verify(onStateChanged, never()).run();
2039   }
2040 
2041   @Test
resetConnectBackoff()2042   public void resetConnectBackoff() {
2043     // Start with a name resolution failure to trigger backoff attempts
2044     Status error = Status.UNAVAILABLE.withCause(new Throwable("fake name resolution error"));
2045     FakeNameResolverFactory nameResolverFactory =
2046         new FakeNameResolverFactory.Builder(expectedUri).setError(error).build();
2047     channelBuilder.nameResolverFactory(nameResolverFactory);
2048     // Name resolution is started as soon as channel is created.
2049     createChannel();
2050     FakeNameResolverFactory.FakeNameResolver resolver = nameResolverFactory.resolvers.get(0);
2051     verify(mockLoadBalancer).handleNameResolutionError(same(error));
2052 
2053     FakeClock.ScheduledTask nameResolverBackoff = getNameResolverRefresh();
2054     assertNotNull("There should be a name resolver backoff task", nameResolverBackoff);
2055     assertEquals(0, resolver.refreshCalled);
2056 
2057     // Verify resetConnectBackoff() calls refresh and cancels the scheduled backoff
2058     channel.resetConnectBackoff();
2059     assertEquals(1, resolver.refreshCalled);
2060     assertTrue(nameResolverBackoff.isCancelled());
2061 
2062     // Simulate a race between cancel and the task scheduler. Should be a no-op.
2063     nameResolverBackoff.command.run();
2064     assertEquals(1, resolver.refreshCalled);
2065 
2066     // Verify that the reconnect policy was recreated and the backoff multiplier reset to 1
2067     timer.forwardNanos(RECONNECT_BACKOFF_INTERVAL_NANOS);
2068     assertEquals(2, resolver.refreshCalled);
2069   }
2070 
2071   @Test
resetConnectBackoff_noOpWithoutPendingResolverBackoff()2072   public void resetConnectBackoff_noOpWithoutPendingResolverBackoff() {
2073     FakeNameResolverFactory nameResolverFactory =
2074         new FakeNameResolverFactory.Builder(expectedUri)
2075             .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
2076             .build();
2077     channelBuilder.nameResolverFactory(nameResolverFactory);
2078     createChannel();
2079     FakeNameResolverFactory.FakeNameResolver nameResolver = nameResolverFactory.resolvers.get(0);
2080     assertEquals(0, nameResolver.refreshCalled);
2081 
2082     channel.resetConnectBackoff();
2083 
2084     assertEquals(0, nameResolver.refreshCalled);
2085   }
2086 
2087   @Test
resetConnectBackoff_noOpWhenChannelShutdown()2088   public void resetConnectBackoff_noOpWhenChannelShutdown() {
2089     FakeNameResolverFactory nameResolverFactory =
2090         new FakeNameResolverFactory.Builder(expectedUri).build();
2091     channelBuilder.nameResolverFactory(nameResolverFactory);
2092     createChannel();
2093 
2094     channel.shutdown();
2095     assertTrue(channel.isShutdown());
2096     channel.resetConnectBackoff();
2097 
2098     FakeNameResolverFactory.FakeNameResolver nameResolver = nameResolverFactory.resolvers.get(0);
2099     assertEquals(0, nameResolver.refreshCalled);
2100   }
2101 
2102   @Test
resetConnectBackoff_noOpWhenNameResolverNotStarted()2103   public void resetConnectBackoff_noOpWhenNameResolverNotStarted() {
2104     FakeNameResolverFactory nameResolverFactory =
2105         new FakeNameResolverFactory.Builder(expectedUri).build();
2106     channelBuilder.nameResolverFactory(nameResolverFactory);
2107     requestConnection = false;
2108     createChannel();
2109 
2110     channel.resetConnectBackoff();
2111 
2112     FakeNameResolverFactory.FakeNameResolver nameResolver = nameResolverFactory.resolvers.get(0);
2113     assertEquals(0, nameResolver.refreshCalled);
2114   }
2115 
2116   @Test
channelsAndSubchannels_instrumented_name()2117   public void channelsAndSubchannels_instrumented_name() throws Exception {
2118     createChannel();
2119     assertEquals(TARGET, getStats(channel).target);
2120 
2121     Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY);
2122     assertEquals(Collections.singletonList(addressGroup).toString(),
2123         getStats((AbstractSubchannel) subchannel).target);
2124   }
2125 
2126   @Test
channelTracing_channelCreationEvent()2127   public void channelTracing_channelCreationEvent() throws Exception {
2128     timer.forwardNanos(1234);
2129     channelBuilder.maxTraceEvents(10);
2130     createChannel();
2131     assertThat(getStats(channel).channelTrace.events).contains(new ChannelTrace.Event.Builder()
2132         .setDescription("Channel created")
2133         .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
2134         .setTimestampNanos(timer.getTicker().read())
2135         .build());
2136   }
2137 
2138   @Test
channelTracing_subchannelCreationEvents()2139   public void channelTracing_subchannelCreationEvents() throws Exception {
2140     channelBuilder.maxTraceEvents(10);
2141     createChannel();
2142     timer.forwardNanos(1234);
2143     AbstractSubchannel subchannel =
2144         (AbstractSubchannel) helper.createSubchannel(addressGroup, Attributes.EMPTY);
2145     assertThat(getStats(channel).channelTrace.events).contains(new ChannelTrace.Event.Builder()
2146         .setDescription("Child channel created")
2147         .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
2148         .setTimestampNanos(timer.getTicker().read())
2149         .setSubchannelRef(subchannel.getInternalSubchannel())
2150         .build());
2151     assertThat(getStats(subchannel).channelTrace.events).contains(new ChannelTrace.Event.Builder()
2152         .setDescription("Subchannel created")
2153         .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
2154         .setTimestampNanos(timer.getTicker().read())
2155         .build());
2156   }
2157 
2158   @Test
channelTracing_nameResolvingErrorEvent()2159   public void channelTracing_nameResolvingErrorEvent() throws Exception {
2160     timer.forwardNanos(1234);
2161     channelBuilder.maxTraceEvents(10);
2162     createChannel();
2163     assertThat(getStats(channel).channelTrace.events).contains(new ChannelTrace.Event.Builder()
2164         .setDescription("Failed to resolve name")
2165         .setSeverity(ChannelTrace.Event.Severity.CT_WARNING)
2166         .setTimestampNanos(timer.getTicker().read())
2167         .build());
2168   }
2169 
2170   @Test
channelTracing_nameResolvedEvent()2171   public void channelTracing_nameResolvedEvent() throws Exception {
2172     timer.forwardNanos(1234);
2173     channelBuilder.maxTraceEvents(10);
2174     FakeNameResolverFactory nameResolverFactory =
2175         new FakeNameResolverFactory.Builder(expectedUri)
2176             .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
2177             .build();
2178     channelBuilder.nameResolverFactory(nameResolverFactory);
2179     createChannel();
2180     assertThat(getStats(channel).channelTrace.events).contains(new ChannelTrace.Event.Builder()
2181         .setDescription("Address resolved: "
2182             + Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
2183         .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
2184         .setTimestampNanos(timer.getTicker().read())
2185         .build());
2186   }
2187 
2188   @Test
channelTracing_nameResolvedEvent_zeorAndNonzeroBackends()2189   public void channelTracing_nameResolvedEvent_zeorAndNonzeroBackends() throws Exception {
2190     timer.forwardNanos(1234);
2191     channelBuilder.maxTraceEvents(10);
2192     List<EquivalentAddressGroup> servers = new ArrayList<>();
2193     servers.add(new EquivalentAddressGroup(socketAddress));
2194     FakeNameResolverFactory nameResolverFactory =
2195         new FakeNameResolverFactory.Builder(expectedUri).setServers(servers).build();
2196     channelBuilder.nameResolverFactory(nameResolverFactory);
2197     createChannel();
2198 
2199     int prevSize = getStats(channel).channelTrace.events.size();
2200     nameResolverFactory.resolvers.get(0).listener.onAddresses(
2201         Collections.singletonList(new EquivalentAddressGroup(
2202             Arrays.asList(new SocketAddress() {}, new SocketAddress() {}))),
2203         Attributes.EMPTY);
2204     assertThat(getStats(channel).channelTrace.events).hasSize(prevSize);
2205 
2206     prevSize = getStats(channel).channelTrace.events.size();
2207     nameResolverFactory.resolvers.get(0).listener.onError(Status.INTERNAL);
2208     assertThat(getStats(channel).channelTrace.events).hasSize(prevSize + 1);
2209 
2210     prevSize = getStats(channel).channelTrace.events.size();
2211     nameResolverFactory.resolvers.get(0).listener.onError(Status.INTERNAL);
2212     assertThat(getStats(channel).channelTrace.events).hasSize(prevSize);
2213 
2214     prevSize = getStats(channel).channelTrace.events.size();
2215     nameResolverFactory.resolvers.get(0).listener.onAddresses(
2216         Collections.singletonList(new EquivalentAddressGroup(
2217             Arrays.asList(new SocketAddress() {}, new SocketAddress() {}))),
2218         Attributes.EMPTY);
2219     assertThat(getStats(channel).channelTrace.events).hasSize(prevSize + 1);
2220   }
2221 
2222   @Test
channelTracing_serviceConfigChange()2223   public void channelTracing_serviceConfigChange() throws Exception {
2224     timer.forwardNanos(1234);
2225     channelBuilder.maxTraceEvents(10);
2226     List<EquivalentAddressGroup> servers = new ArrayList<>();
2227     servers.add(new EquivalentAddressGroup(socketAddress));
2228     FakeNameResolverFactory nameResolverFactory =
2229         new FakeNameResolverFactory.Builder(expectedUri).setServers(servers).build();
2230     channelBuilder.nameResolverFactory(nameResolverFactory);
2231     createChannel();
2232 
2233     int prevSize = getStats(channel).channelTrace.events.size();
2234     Attributes attributes =
2235         Attributes.newBuilder()
2236             .set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, new HashMap<String, Object>())
2237             .build();
2238     nameResolverFactory.resolvers.get(0).listener.onAddresses(
2239         Collections.singletonList(new EquivalentAddressGroup(
2240             Arrays.asList(new SocketAddress() {}, new SocketAddress() {}))),
2241         attributes);
2242     assertThat(getStats(channel).channelTrace.events).hasSize(prevSize + 1);
2243     assertThat(getStats(channel).channelTrace.events.get(prevSize))
2244         .isEqualTo(new ChannelTrace.Event.Builder()
2245             .setDescription("Service config changed")
2246             .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
2247             .setTimestampNanos(timer.getTicker().read())
2248             .build());
2249 
2250     prevSize = getStats(channel).channelTrace.events.size();
2251     nameResolverFactory.resolvers.get(0).listener.onAddresses(
2252         Collections.singletonList(new EquivalentAddressGroup(
2253             Arrays.asList(new SocketAddress() {}, new SocketAddress() {}))),
2254         attributes);
2255     assertThat(getStats(channel).channelTrace.events).hasSize(prevSize);
2256 
2257     prevSize = getStats(channel).channelTrace.events.size();
2258     Map<String, Object> serviceConfig = new HashMap<String, Object>();
2259     serviceConfig.put("methodConfig", new HashMap<String, Object>());
2260     attributes =
2261         Attributes.newBuilder()
2262             .set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, serviceConfig)
2263             .build();
2264     timer.forwardNanos(1234);
2265     nameResolverFactory.resolvers.get(0).listener.onAddresses(
2266         Collections.singletonList(new EquivalentAddressGroup(
2267             Arrays.asList(new SocketAddress() {}, new SocketAddress() {}))),
2268         attributes);
2269     assertThat(getStats(channel).channelTrace.events).hasSize(prevSize + 1);
2270     assertThat(getStats(channel).channelTrace.events.get(prevSize))
2271         .isEqualTo(new ChannelTrace.Event.Builder()
2272             .setDescription("Service config changed")
2273             .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
2274             .setTimestampNanos(timer.getTicker().read())
2275             .build());
2276   }
2277 
2278   @Test
channelTracing_stateChangeEvent()2279   public void channelTracing_stateChangeEvent() throws Exception {
2280     channelBuilder.maxTraceEvents(10);
2281     createChannel();
2282     timer.forwardNanos(1234);
2283     helper.updateBalancingState(CONNECTING, mockPicker);
2284     assertThat(getStats(channel).channelTrace.events).contains(new ChannelTrace.Event.Builder()
2285         .setDescription("Entering CONNECTING state")
2286         .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
2287         .setTimestampNanos(timer.getTicker().read())
2288         .build());
2289   }
2290 
2291   @Test
channelTracing_subchannelStateChangeEvent()2292   public void channelTracing_subchannelStateChangeEvent() throws Exception {
2293     channelBuilder.maxTraceEvents(10);
2294     createChannel();
2295     AbstractSubchannel subchannel =
2296         (AbstractSubchannel) helper.createSubchannel(addressGroup, Attributes.EMPTY);
2297     timer.forwardNanos(1234);
2298     subchannel.obtainActiveTransport();
2299     assertThat(getStats(subchannel).channelTrace.events).contains(new ChannelTrace.Event.Builder()
2300         .setDescription("Entering CONNECTING state")
2301         .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
2302         .setTimestampNanos(timer.getTicker().read())
2303         .build());
2304   }
2305 
2306   @Test
channelTracing_oobChannelStateChangeEvent()2307   public void channelTracing_oobChannelStateChangeEvent() throws Exception {
2308     channelBuilder.maxTraceEvents(10);
2309     createChannel();
2310     OobChannel oobChannel = (OobChannel) helper.createOobChannel(addressGroup, "authority");
2311     timer.forwardNanos(1234);
2312     oobChannel.handleSubchannelStateChange(
2313         ConnectivityStateInfo.forNonError(ConnectivityState.CONNECTING));
2314     assertThat(getStats(oobChannel).channelTrace.events).contains(new ChannelTrace.Event.Builder()
2315         .setDescription("Entering CONNECTING state")
2316         .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
2317         .setTimestampNanos(timer.getTicker().read())
2318         .build());
2319   }
2320 
2321   @Test
channelTracing_oobChannelCreationEvents()2322   public void channelTracing_oobChannelCreationEvents() throws Exception {
2323     channelBuilder.maxTraceEvents(10);
2324     createChannel();
2325     timer.forwardNanos(1234);
2326     OobChannel oobChannel = (OobChannel) helper.createOobChannel(addressGroup, "authority");
2327     assertThat(getStats(channel).channelTrace.events).contains(new ChannelTrace.Event.Builder()
2328         .setDescription("Child channel created")
2329         .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
2330         .setTimestampNanos(timer.getTicker().read())
2331         .setChannelRef(oobChannel)
2332         .build());
2333     assertThat(getStats(oobChannel).channelTrace.events).contains(new ChannelTrace.Event.Builder()
2334         .setDescription("OobChannel created")
2335         .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
2336         .setTimestampNanos(timer.getTicker().read())
2337         .build());
2338     assertThat(getStats(oobChannel.getInternalSubchannel()).channelTrace.events).contains(
2339         new ChannelTrace.Event.Builder()
2340             .setDescription("Subchannel created")
2341             .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
2342             .setTimestampNanos(timer.getTicker().read())
2343             .build());
2344   }
2345 
2346   @Test
channelsAndSubchannels_instrumented_state()2347   public void channelsAndSubchannels_instrumented_state() throws Exception {
2348     createChannel();
2349 
2350     ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(null);
2351     verify(mockLoadBalancerFactory).newLoadBalancer(helperCaptor.capture());
2352     helper = helperCaptor.getValue();
2353 
2354     assertEquals(IDLE, getStats(channel).state);
2355     helper.updateBalancingState(CONNECTING, mockPicker);
2356     assertEquals(CONNECTING, getStats(channel).state);
2357 
2358     AbstractSubchannel subchannel =
2359         (AbstractSubchannel) helper.createSubchannel(addressGroup, Attributes.EMPTY);
2360 
2361     assertEquals(IDLE, getStats(subchannel).state);
2362     subchannel.requestConnection();
2363     assertEquals(CONNECTING, getStats(subchannel).state);
2364 
2365     MockClientTransportInfo transportInfo = transports.poll();
2366 
2367     assertEquals(CONNECTING, getStats(subchannel).state);
2368     transportInfo.listener.transportReady();
2369     assertEquals(READY, getStats(subchannel).state);
2370 
2371     assertEquals(CONNECTING, getStats(channel).state);
2372     helper.updateBalancingState(READY, mockPicker);
2373     assertEquals(READY, getStats(channel).state);
2374 
2375     channel.shutdownNow();
2376     assertEquals(SHUTDOWN, getStats(channel).state);
2377     assertEquals(SHUTDOWN, getStats(subchannel).state);
2378   }
2379 
2380   @Test
channelStat_callStarted()2381   public void channelStat_callStarted() throws Exception {
2382     createChannel();
2383     ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
2384     assertEquals(0, getStats(channel).callsStarted);
2385     call.start(mockCallListener, new Metadata());
2386     assertEquals(1, getStats(channel).callsStarted);
2387     assertEquals(executor.getTicker().read(), getStats(channel).lastCallStartedNanos);
2388   }
2389 
2390   @Test
channelsAndSubChannels_instrumented_success()2391   public void channelsAndSubChannels_instrumented_success() throws Exception {
2392     channelsAndSubchannels_instrumented0(true);
2393   }
2394 
2395   @Test
channelsAndSubChannels_instrumented_fail()2396   public void channelsAndSubChannels_instrumented_fail() throws Exception {
2397     channelsAndSubchannels_instrumented0(false);
2398   }
2399 
channelsAndSubchannels_instrumented0(boolean success)2400   private void channelsAndSubchannels_instrumented0(boolean success) throws Exception {
2401     createChannel();
2402 
2403     ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
2404 
2405     // Channel stat bumped when ClientCall.start() called
2406     assertEquals(0, getStats(channel).callsStarted);
2407     call.start(mockCallListener, new Metadata());
2408     assertEquals(1, getStats(channel).callsStarted);
2409 
2410     ClientStream mockStream = mock(ClientStream.class);
2411     ClientStreamTracer.Factory factory = mock(ClientStreamTracer.Factory.class);
2412     AbstractSubchannel subchannel =
2413         (AbstractSubchannel) helper.createSubchannel(addressGroup, Attributes.EMPTY);
2414     subchannel.requestConnection();
2415     MockClientTransportInfo transportInfo = transports.poll();
2416     transportInfo.listener.transportReady();
2417     ClientTransport mockTransport = transportInfo.transport;
2418     when(mockTransport.newStream(
2419             any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)))
2420         .thenReturn(mockStream);
2421     when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn(
2422         PickResult.withSubchannel(subchannel, factory));
2423 
2424     // subchannel stat bumped when call gets assigned to it
2425     assertEquals(0, getStats(subchannel).callsStarted);
2426     helper.updateBalancingState(READY, mockPicker);
2427     assertEquals(1, executor.runDueTasks());
2428     verify(mockStream).start(streamListenerCaptor.capture());
2429     assertEquals(1, getStats(subchannel).callsStarted);
2430 
2431     ClientStreamListener streamListener = streamListenerCaptor.getValue();
2432     call.halfClose();
2433 
2434     // closing stream listener affects subchannel stats immediately
2435     assertEquals(0, getStats(subchannel).callsSucceeded);
2436     assertEquals(0, getStats(subchannel).callsFailed);
2437     streamListener.closed(success ? Status.OK : Status.UNKNOWN, new Metadata());
2438     if (success) {
2439       assertEquals(1, getStats(subchannel).callsSucceeded);
2440       assertEquals(0, getStats(subchannel).callsFailed);
2441     } else {
2442       assertEquals(0, getStats(subchannel).callsSucceeded);
2443       assertEquals(1, getStats(subchannel).callsFailed);
2444     }
2445 
2446     // channel stats bumped when the ClientCall.Listener is notified
2447     assertEquals(0, getStats(channel).callsSucceeded);
2448     assertEquals(0, getStats(channel).callsFailed);
2449     executor.runDueTasks();
2450     if (success) {
2451       assertEquals(1, getStats(channel).callsSucceeded);
2452       assertEquals(0, getStats(channel).callsFailed);
2453     } else {
2454       assertEquals(0, getStats(channel).callsSucceeded);
2455       assertEquals(1, getStats(channel).callsFailed);
2456     }
2457   }
2458 
2459   @Test
channelsAndSubchannels_oob_instrumented_success()2460   public void channelsAndSubchannels_oob_instrumented_success() throws Exception {
2461     channelsAndSubchannels_oob_instrumented0(true);
2462   }
2463 
2464   @Test
channelsAndSubchannels_oob_instrumented_fail()2465   public void channelsAndSubchannels_oob_instrumented_fail() throws Exception {
2466     channelsAndSubchannels_oob_instrumented0(false);
2467   }
2468 
channelsAndSubchannels_oob_instrumented0(boolean success)2469   private void channelsAndSubchannels_oob_instrumented0(boolean success) throws Exception {
2470     // set up
2471     ClientStream mockStream = mock(ClientStream.class);
2472     createChannel();
2473 
2474     OobChannel oobChannel = (OobChannel) helper.createOobChannel(addressGroup, "oobauthority");
2475     AbstractSubchannel oobSubchannel = (AbstractSubchannel) oobChannel.getSubchannel();
2476     FakeClock callExecutor = new FakeClock();
2477     CallOptions options =
2478         CallOptions.DEFAULT.withExecutor(callExecutor.getScheduledExecutorService());
2479     ClientCall<String, Integer> call = oobChannel.newCall(method, options);
2480     Metadata headers = new Metadata();
2481 
2482     // Channel stat bumped when ClientCall.start() called
2483     assertEquals(0, getStats(oobChannel).callsStarted);
2484     call.start(mockCallListener, headers);
2485     assertEquals(1, getStats(oobChannel).callsStarted);
2486 
2487     MockClientTransportInfo transportInfo = transports.poll();
2488     ConnectionClientTransport mockTransport = transportInfo.transport;
2489     ManagedClientTransport.Listener transportListener = transportInfo.listener;
2490     when(mockTransport.newStream(same(method), same(headers), any(CallOptions.class)))
2491         .thenReturn(mockStream);
2492 
2493     // subchannel stat bumped when call gets assigned to it
2494     assertEquals(0, getStats(oobSubchannel).callsStarted);
2495     transportListener.transportReady();
2496     callExecutor.runDueTasks();
2497     verify(mockStream).start(streamListenerCaptor.capture());
2498     assertEquals(1, getStats(oobSubchannel).callsStarted);
2499 
2500     ClientStreamListener streamListener = streamListenerCaptor.getValue();
2501     call.halfClose();
2502 
2503     // closing stream listener affects subchannel stats immediately
2504     assertEquals(0, getStats(oobSubchannel).callsSucceeded);
2505     assertEquals(0, getStats(oobSubchannel).callsFailed);
2506     streamListener.closed(success ? Status.OK : Status.UNKNOWN, new Metadata());
2507     if (success) {
2508       assertEquals(1, getStats(oobSubchannel).callsSucceeded);
2509       assertEquals(0, getStats(oobSubchannel).callsFailed);
2510     } else {
2511       assertEquals(0, getStats(oobSubchannel).callsSucceeded);
2512       assertEquals(1, getStats(oobSubchannel).callsFailed);
2513     }
2514 
2515     // channel stats bumped when the ClientCall.Listener is notified
2516     assertEquals(0, getStats(oobChannel).callsSucceeded);
2517     assertEquals(0, getStats(oobChannel).callsFailed);
2518     callExecutor.runDueTasks();
2519     if (success) {
2520       assertEquals(1, getStats(oobChannel).callsSucceeded);
2521       assertEquals(0, getStats(oobChannel).callsFailed);
2522     } else {
2523       assertEquals(0, getStats(oobChannel).callsSucceeded);
2524       assertEquals(1, getStats(oobChannel).callsFailed);
2525     }
2526     // oob channel is separate from the original channel
2527     assertEquals(0, getStats(channel).callsSucceeded);
2528     assertEquals(0, getStats(channel).callsFailed);
2529   }
2530 
2531   @Test
channelsAndSubchannels_oob_instrumented_name()2532   public void channelsAndSubchannels_oob_instrumented_name() throws Exception {
2533     createChannel();
2534 
2535     String authority = "oobauthority";
2536     OobChannel oobChannel = (OobChannel) helper.createOobChannel(addressGroup, authority);
2537     assertEquals(authority, getStats(oobChannel).target);
2538   }
2539 
2540   @Test
channelsAndSubchannels_oob_instrumented_state()2541   public void channelsAndSubchannels_oob_instrumented_state() throws Exception {
2542     createChannel();
2543 
2544     OobChannel oobChannel = (OobChannel) helper.createOobChannel(addressGroup, "oobauthority");
2545     assertEquals(IDLE, getStats(oobChannel).state);
2546 
2547     oobChannel.getSubchannel().requestConnection();
2548     assertEquals(CONNECTING, getStats(oobChannel).state);
2549 
2550     MockClientTransportInfo transportInfo = transports.poll();
2551     ManagedClientTransport.Listener transportListener = transportInfo.listener;
2552 
2553     transportListener.transportReady();
2554     assertEquals(READY, getStats(oobChannel).state);
2555 
2556     // oobchannel state is separate from the ManagedChannel
2557     assertEquals(IDLE, getStats(channel).state);
2558     channel.shutdownNow();
2559     assertEquals(SHUTDOWN, getStats(channel).state);
2560     assertEquals(SHUTDOWN, getStats(oobChannel).state);
2561   }
2562 
2563   @Test
binaryLogInstalled()2564   public void binaryLogInstalled() throws Exception {
2565     final SettableFuture<Boolean> intercepted = SettableFuture.create();
2566     channelBuilder.binlog = new BinaryLog() {
2567       @Override
2568       public void close() throws IOException {
2569         // noop
2570       }
2571 
2572       @Override
2573       public <ReqT, RespT> ServerMethodDefinition<?, ?> wrapMethodDefinition(
2574           ServerMethodDefinition<ReqT, RespT> oMethodDef) {
2575         return oMethodDef;
2576       }
2577 
2578       @Override
2579       public Channel wrapChannel(Channel channel) {
2580         return ClientInterceptors.intercept(channel,
2581             new ClientInterceptor() {
2582               @Override
2583               public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
2584                   MethodDescriptor<ReqT, RespT> method,
2585                   CallOptions callOptions,
2586                   Channel next) {
2587                 intercepted.set(true);
2588                 return next.newCall(method, callOptions);
2589               }
2590             });
2591       }
2592     };
2593 
2594     createChannel();
2595     ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
2596     call.start(mockCallListener, new Metadata());
2597     assertTrue(intercepted.get());
2598   }
2599 
2600   @Test
retryBackoffThenChannelShutdown_retryShouldStillHappen_newCallShouldFail()2601   public void retryBackoffThenChannelShutdown_retryShouldStillHappen_newCallShouldFail() {
2602     Map<String, Object> retryPolicy = new HashMap<String, Object>();
2603     retryPolicy.put("maxAttempts", 3D);
2604     retryPolicy.put("initialBackoff", "10s");
2605     retryPolicy.put("maxBackoff", "30s");
2606     retryPolicy.put("backoffMultiplier", 2D);
2607     retryPolicy.put("retryableStatusCodes", Arrays.<Object>asList("UNAVAILABLE"));
2608     Map<String, Object> methodConfig = new HashMap<String, Object>();
2609     Map<String, Object> name = new HashMap<String, Object>();
2610     name.put("service", "service");
2611     methodConfig.put("name", Arrays.<Object>asList(name));
2612     methodConfig.put("retryPolicy", retryPolicy);
2613     Map<String, Object> serviceConfig = new HashMap<String, Object>();
2614     serviceConfig.put("methodConfig", Arrays.<Object>asList(methodConfig));
2615     Attributes attributesWithRetryPolicy = Attributes
2616         .newBuilder().set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, serviceConfig).build();
2617 
2618     FakeNameResolverFactory nameResolverFactory =
2619         new FakeNameResolverFactory.Builder(expectedUri)
2620             .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
2621             .build();
2622     nameResolverFactory.nextResolvedAttributes.set(attributesWithRetryPolicy);
2623     channelBuilder.nameResolverFactory(nameResolverFactory);
2624     channelBuilder.executor(MoreExecutors.directExecutor());
2625     channelBuilder.enableRetry();
2626     RetriableStream.setRandom(
2627         // not random
2628         new Random() {
2629           @Override
2630           public double nextDouble() {
2631             return 1D; // fake random
2632           }
2633         });
2634 
2635     requestConnection = false;
2636     createChannel();
2637 
2638     ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
2639     call.start(mockCallListener, new Metadata());
2640     ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(Helper.class);
2641     verify(mockLoadBalancerFactory).newLoadBalancer(helperCaptor.capture());
2642     helper = helperCaptor.getValue();
2643     verify(mockLoadBalancer)
2644         .handleResolvedAddressGroups(nameResolverFactory.servers, attributesWithRetryPolicy);
2645 
2646     // simulating request connection and then transport ready after resolved address
2647     Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY);
2648     when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
2649         .thenReturn(PickResult.withSubchannel(subchannel));
2650     subchannel.requestConnection();
2651     MockClientTransportInfo transportInfo = transports.poll();
2652     ConnectionClientTransport mockTransport = transportInfo.transport;
2653     ClientStream mockStream = mock(ClientStream.class);
2654     ClientStream mockStream2 = mock(ClientStream.class);
2655     when(mockTransport.newStream(same(method), any(Metadata.class), any(CallOptions.class)))
2656         .thenReturn(mockStream).thenReturn(mockStream2);
2657     transportInfo.listener.transportReady();
2658     helper.updateBalancingState(READY, mockPicker);
2659 
2660     ArgumentCaptor<ClientStreamListener> streamListenerCaptor =
2661         ArgumentCaptor.forClass(ClientStreamListener.class);
2662     verify(mockStream).start(streamListenerCaptor.capture());
2663     assertThat(timer.getPendingTasks()).isEmpty();
2664 
2665     // trigger retry
2666     streamListenerCaptor.getValue().closed(Status.UNAVAILABLE, new Metadata());
2667 
2668     // in backoff
2669     timer.forwardTime(5, TimeUnit.SECONDS);
2670     assertThat(timer.getPendingTasks()).hasSize(1);
2671     verify(mockStream2, never()).start(any(ClientStreamListener.class));
2672 
2673     // shutdown during backoff period
2674     channel.shutdown();
2675 
2676     assertThat(timer.getPendingTasks()).hasSize(1);
2677     verify(mockCallListener, never()).onClose(any(Status.class), any(Metadata.class));
2678 
2679     ClientCall<String, Integer> call2 = channel.newCall(method, CallOptions.DEFAULT);
2680     call2.start(mockCallListener2, new Metadata());
2681 
2682     ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
2683     verify(mockCallListener2).onClose(statusCaptor.capture(), any(Metadata.class));
2684     assertSame(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode());
2685     assertEquals("Channel shutdown invoked", statusCaptor.getValue().getDescription());
2686 
2687     // backoff ends
2688     timer.forwardTime(5, TimeUnit.SECONDS);
2689     assertThat(timer.getPendingTasks()).isEmpty();
2690     verify(mockStream2).start(streamListenerCaptor.capture());
2691     verify(mockLoadBalancer, never()).shutdown();
2692     assertFalse(
2693         "channel.isTerminated() is expected to be false but was true",
2694         channel.isTerminated());
2695 
2696     streamListenerCaptor.getValue().closed(Status.INTERNAL, new Metadata());
2697     verify(mockLoadBalancer).shutdown();
2698     // simulating the shutdown of load balancer triggers the shutdown of subchannel
2699     subchannel.shutdown();
2700     transportInfo.listener.transportTerminated(); // simulating transport terminated
2701     assertTrue(
2702         "channel.isTerminated() is expected to be true but was false",
2703         channel.isTerminated());
2704   }
2705 
2706   @Test
badServiceConfigIsRecoverable()2707   public void badServiceConfigIsRecoverable() throws Exception {
2708     final List<EquivalentAddressGroup> addresses =
2709         ImmutableList.of(new EquivalentAddressGroup(new SocketAddress() {}));
2710     final class FakeNameResolver extends NameResolver {
2711       Listener listener;
2712 
2713       @Override
2714       public String getServiceAuthority() {
2715         return "also fake";
2716       }
2717 
2718       @Override
2719       public void start(Listener listener) {
2720         this.listener = listener;
2721         listener.onAddresses(addresses,
2722             Attributes.newBuilder()
2723                 .set(
2724                     GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG,
2725                     ImmutableMap.<String, Object>of("loadBalancingPolicy", "kaboom"))
2726                 .build());
2727       }
2728 
2729       @Override
2730       public void shutdown() {}
2731     }
2732 
2733     final class FakeNameResolverFactory extends NameResolver.Factory {
2734       FakeNameResolver resolver;
2735 
2736       @Nullable
2737       @Override
2738       public NameResolver newNameResolver(URI targetUri, Attributes params) {
2739         return (resolver = new FakeNameResolver());
2740       }
2741 
2742       @Override
2743       public String getDefaultScheme() {
2744         return "fake";
2745       }
2746     }
2747 
2748     FakeNameResolverFactory factory = new FakeNameResolverFactory();
2749     final class CustomBuilder extends AbstractManagedChannelImplBuilder<CustomBuilder> {
2750 
2751       CustomBuilder() {
2752         super(TARGET);
2753         this.executorPool = ManagedChannelImplTest.this.executorPool;
2754         this.channelz = ManagedChannelImplTest.this.channelz;
2755       }
2756 
2757       @Override
2758       protected ClientTransportFactory buildTransportFactory() {
2759         return mockTransportFactory;
2760       }
2761     }
2762 
2763     ManagedChannel mychannel = new CustomBuilder()
2764         .nameResolverFactory(factory)
2765         .loadBalancerFactory(new AutoConfiguredLoadBalancerFactory(null, null)).build();
2766 
2767     ClientCall<Void, Void> call1 =
2768         mychannel.newCall(TestMethodDescriptors.voidMethod(), CallOptions.DEFAULT);
2769     ListenableFuture<Void> future1 = ClientCalls.futureUnaryCall(call1, null);
2770     executor.runDueTasks();
2771     try {
2772       future1.get();
2773       Assert.fail();
2774     } catch (ExecutionException e) {
2775       assertThat(Throwables.getStackTraceAsString(e.getCause())).contains("kaboom");
2776     }
2777 
2778     // ok the service config is bad, let's fix it.
2779 
2780     factory.resolver.listener.onAddresses(addresses,
2781         Attributes.newBuilder()
2782         .set(
2783             GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG,
2784             ImmutableMap.<String, Object>of("loadBalancingPolicy", "round_robin"))
2785         .build());
2786 
2787     ClientCall<Void, Void> call2 = mychannel.newCall(
2788         TestMethodDescriptors.voidMethod(),
2789         CallOptions.DEFAULT.withDeadlineAfter(5, TimeUnit.SECONDS));
2790     ListenableFuture<Void> future2 = ClientCalls.futureUnaryCall(call2, null);
2791 
2792     timer.forwardTime(1234, TimeUnit.SECONDS);
2793 
2794     executor.runDueTasks();
2795     try {
2796       future2.get();
2797       Assert.fail();
2798     } catch (ExecutionException e) {
2799       assertThat(Throwables.getStackTraceAsString(e.getCause())).contains("deadline");
2800     }
2801 
2802     mychannel.shutdownNow();
2803   }
2804 
2805   @Test
getAuthorityAfterShutdown()2806   public void getAuthorityAfterShutdown() throws Exception {
2807     createChannel();
2808     assertEquals(SERVICE_NAME, channel.authority());
2809     channel.shutdownNow().awaitTermination(1, TimeUnit.SECONDS);
2810     assertEquals(SERVICE_NAME, channel.authority());
2811   }
2812 
2813   private static final class ChannelBuilder
2814       extends AbstractManagedChannelImplBuilder<ChannelBuilder> {
2815 
ChannelBuilder()2816     ChannelBuilder() {
2817       super(TARGET);
2818     }
2819 
buildTransportFactory()2820     @Override protected ClientTransportFactory buildTransportFactory() {
2821       throw new UnsupportedOperationException();
2822     }
2823 
getNameResolverParams()2824     @Override protected Attributes getNameResolverParams() {
2825       return NAME_RESOLVER_PARAMS;
2826     }
2827   }
2828 
2829   private static final class FakeBackoffPolicyProvider implements BackoffPolicy.Provider {
2830     @Override
get()2831     public BackoffPolicy get() {
2832       return new BackoffPolicy() {
2833         int multiplier = 1;
2834 
2835         @Override
2836         public long nextBackoffNanos() {
2837           return RECONNECT_BACKOFF_INTERVAL_NANOS * multiplier++;
2838         }
2839       };
2840     }
2841   }
2842 
2843   private static final class FakeNameResolverFactory extends NameResolver.Factory {
2844     final URI expectedUri;
2845     final List<EquivalentAddressGroup> servers;
2846     final boolean resolvedAtStart;
2847     final Status error;
2848     final ArrayList<FakeNameResolver> resolvers = new ArrayList<>();
2849     // The Attributes argument of the next invocation of listener.onAddresses(servers, attrs)
2850     final AtomicReference<Attributes> nextResolvedAttributes =
2851         new AtomicReference<Attributes>(Attributes.EMPTY);
2852 
FakeNameResolverFactory( URI expectedUri, List<EquivalentAddressGroup> servers, boolean resolvedAtStart, Status error)2853     FakeNameResolverFactory(
2854         URI expectedUri,
2855         List<EquivalentAddressGroup> servers,
2856         boolean resolvedAtStart,
2857         Status error) {
2858       this.expectedUri = expectedUri;
2859       this.servers = servers;
2860       this.resolvedAtStart = resolvedAtStart;
2861       this.error = error;
2862     }
2863 
2864     @Override
newNameResolver(final URI targetUri, Attributes params)2865     public NameResolver newNameResolver(final URI targetUri, Attributes params) {
2866       if (!expectedUri.equals(targetUri)) {
2867         return null;
2868       }
2869       assertSame(NAME_RESOLVER_PARAMS, params);
2870       FakeNameResolver resolver = new FakeNameResolver(error);
2871       resolvers.add(resolver);
2872       return resolver;
2873     }
2874 
2875     @Override
getDefaultScheme()2876     public String getDefaultScheme() {
2877       return "fake";
2878     }
2879 
allResolved()2880     void allResolved() {
2881       for (FakeNameResolver resolver : resolvers) {
2882         resolver.resolved();
2883       }
2884     }
2885 
2886     final class FakeNameResolver extends NameResolver {
2887       Listener listener;
2888       boolean shutdown;
2889       int refreshCalled;
2890       Status error;
2891 
FakeNameResolver(Status error)2892       FakeNameResolver(Status error) {
2893         this.error = error;
2894       }
2895 
getServiceAuthority()2896       @Override public String getServiceAuthority() {
2897         return expectedUri.getAuthority();
2898       }
2899 
start(final Listener listener)2900       @Override public void start(final Listener listener) {
2901         this.listener = listener;
2902         if (resolvedAtStart) {
2903           resolved();
2904         }
2905       }
2906 
refresh()2907       @Override public void refresh() {
2908         assertNotNull(listener);
2909         refreshCalled++;
2910         resolved();
2911       }
2912 
resolved()2913       void resolved() {
2914         if (error != null) {
2915           listener.onError(error);
2916           return;
2917         }
2918         listener.onAddresses(servers, nextResolvedAttributes.get());
2919       }
2920 
shutdown()2921       @Override public void shutdown() {
2922         shutdown = true;
2923       }
2924     }
2925 
2926     static final class Builder {
2927       final URI expectedUri;
2928       List<EquivalentAddressGroup> servers = ImmutableList.<EquivalentAddressGroup>of();
2929       boolean resolvedAtStart = true;
2930       Status error = null;
2931 
Builder(URI expectedUri)2932       Builder(URI expectedUri) {
2933         this.expectedUri = expectedUri;
2934       }
2935 
setServers(List<EquivalentAddressGroup> servers)2936       Builder setServers(List<EquivalentAddressGroup> servers) {
2937         this.servers = servers;
2938         return this;
2939       }
2940 
setResolvedAtStart(boolean resolvedAtStart)2941       Builder setResolvedAtStart(boolean resolvedAtStart) {
2942         this.resolvedAtStart = resolvedAtStart;
2943         return this;
2944       }
2945 
setError(Status error)2946       Builder setError(Status error) {
2947         this.error = error;
2948         return this;
2949       }
2950 
build()2951       FakeNameResolverFactory build() {
2952         return new FakeNameResolverFactory(expectedUri, servers, resolvedAtStart, error);
2953       }
2954     }
2955   }
2956 
getStats(AbstractSubchannel subchannel)2957   private static ChannelStats getStats(AbstractSubchannel subchannel) throws Exception {
2958     return subchannel.getInternalSubchannel().getStats().get();
2959   }
2960 
getStats( InternalInstrumented<ChannelStats> instrumented)2961   private static ChannelStats getStats(
2962       InternalInstrumented<ChannelStats> instrumented) throws Exception {
2963     return instrumented.getStats().get();
2964   }
2965 
getNameResolverRefresh()2966   private FakeClock.ScheduledTask getNameResolverRefresh() {
2967     return Iterables.getOnlyElement(timer.getPendingTasks(NAME_RESOLVER_REFRESH_TASK_FILTER), null);
2968   }
2969 }
2970