1 /*
2  * Copyright 2016 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.grpclb;
18 
19 import static com.google.common.truth.Truth.assertThat;
20 import static io.grpc.ConnectivityState.CONNECTING;
21 import static io.grpc.ConnectivityState.IDLE;
22 import static io.grpc.ConnectivityState.READY;
23 import static io.grpc.ConnectivityState.SHUTDOWN;
24 import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
25 import static io.grpc.grpclb.GrpclbState.BUFFER_ENTRY;
26 import static io.grpc.grpclb.GrpclbState.DROP_PICK_RESULT;
27 import static org.junit.Assert.assertEquals;
28 import static org.junit.Assert.assertFalse;
29 import static org.junit.Assert.assertNull;
30 import static org.junit.Assert.assertSame;
31 import static org.junit.Assert.assertTrue;
32 import static org.mockito.AdditionalAnswers.delegatesTo;
33 import static org.mockito.Matchers.any;
34 import static org.mockito.Matchers.eq;
35 import static org.mockito.Matchers.same;
36 import static org.mockito.Mockito.atLeast;
37 import static org.mockito.Mockito.doAnswer;
38 import static org.mockito.Mockito.inOrder;
39 import static org.mockito.Mockito.mock;
40 import static org.mockito.Mockito.never;
41 import static org.mockito.Mockito.times;
42 import static org.mockito.Mockito.verify;
43 import static org.mockito.Mockito.verifyNoMoreInteractions;
44 import static org.mockito.Mockito.when;
45 
46 import com.google.common.collect.Iterables;
47 import com.google.common.util.concurrent.MoreExecutors;
48 import com.google.protobuf.ByteString;
49 import com.google.protobuf.util.Durations;
50 import com.google.protobuf.util.Timestamps;
51 import io.grpc.Attributes;
52 import io.grpc.CallOptions;
53 import io.grpc.ClientStreamTracer;
54 import io.grpc.ConnectivityState;
55 import io.grpc.ConnectivityStateInfo;
56 import io.grpc.EquivalentAddressGroup;
57 import io.grpc.LoadBalancer.Helper;
58 import io.grpc.LoadBalancer.PickResult;
59 import io.grpc.LoadBalancer.PickSubchannelArgs;
60 import io.grpc.LoadBalancer.Subchannel;
61 import io.grpc.LoadBalancer.SubchannelPicker;
62 import io.grpc.ManagedChannel;
63 import io.grpc.Metadata;
64 import io.grpc.Status;
65 import io.grpc.grpclb.GrpclbState.BackendEntry;
66 import io.grpc.grpclb.GrpclbState.DropEntry;
67 import io.grpc.grpclb.GrpclbState.ErrorEntry;
68 import io.grpc.grpclb.GrpclbState.RoundRobinPicker;
69 import io.grpc.inprocess.InProcessChannelBuilder;
70 import io.grpc.inprocess.InProcessServerBuilder;
71 import io.grpc.internal.BackoffPolicy;
72 import io.grpc.internal.FakeClock;
73 import io.grpc.internal.GrpcAttributes;
74 import io.grpc.internal.ObjectPool;
75 import io.grpc.internal.SerializingExecutor;
76 import io.grpc.internal.TimeProvider;
77 import io.grpc.lb.v1.ClientStats;
78 import io.grpc.lb.v1.ClientStatsPerToken;
79 import io.grpc.lb.v1.InitialLoadBalanceRequest;
80 import io.grpc.lb.v1.InitialLoadBalanceResponse;
81 import io.grpc.lb.v1.LoadBalanceRequest;
82 import io.grpc.lb.v1.LoadBalanceResponse;
83 import io.grpc.lb.v1.LoadBalancerGrpc;
84 import io.grpc.lb.v1.Server;
85 import io.grpc.lb.v1.ServerList;
86 import io.grpc.stub.StreamObserver;
87 import java.net.InetSocketAddress;
88 import java.net.SocketAddress;
89 import java.util.ArrayList;
90 import java.util.Arrays;
91 import java.util.Collections;
92 import java.util.LinkedList;
93 import java.util.List;
94 import java.util.concurrent.ScheduledExecutorService;
95 import java.util.concurrent.TimeUnit;
96 import javax.annotation.Nullable;
97 import org.junit.After;
98 import org.junit.Before;
99 import org.junit.Test;
100 import org.junit.runner.RunWith;
101 import org.junit.runners.JUnit4;
102 import org.mockito.ArgumentCaptor;
103 import org.mockito.Captor;
104 import org.mockito.InOrder;
105 import org.mockito.Mock;
106 import org.mockito.MockitoAnnotations;
107 import org.mockito.invocation.InvocationOnMock;
108 import org.mockito.stubbing.Answer;
109 
110 /** Unit tests for {@link GrpclbLoadBalancer}. */
111 @RunWith(JUnit4.class)
112 public class GrpclbLoadBalancerTest {
113   private static final Attributes.Key<String> RESOLUTION_ATTR =
114       Attributes.Key.create("resolution-attr");
115   private static final String SERVICE_AUTHORITY = "api.google.com";
116   private static final FakeClock.TaskFilter LOAD_REPORTING_TASK_FILTER =
117       new FakeClock.TaskFilter() {
118         @Override
119         public boolean shouldAccept(Runnable command) {
120           return command instanceof GrpclbState.LoadReportingTask;
121         }
122       };
123   private static final FakeClock.TaskFilter FALLBACK_MODE_TASK_FILTER =
124       new FakeClock.TaskFilter() {
125         @Override
126         public boolean shouldAccept(Runnable command) {
127           return command instanceof GrpclbState.FallbackModeTask;
128         }
129       };
130   private static final FakeClock.TaskFilter LB_RPC_RETRY_TASK_FILTER =
131       new FakeClock.TaskFilter() {
132         @Override
133         public boolean shouldAccept(Runnable command) {
134           return command instanceof GrpclbState.LbRpcRetryTask;
135         }
136       };
137   private static final Attributes LB_BACKEND_ATTRS =
138       Attributes.newBuilder().set(GrpcAttributes.ATTR_LB_PROVIDED_BACKEND, true).build();
139 
140   @Mock
141   private Helper helper;
142   @Mock
143   private SubchannelPool subchannelPool;
144   private SubchannelPicker currentPicker;
145   private LoadBalancerGrpc.LoadBalancerImplBase mockLbService;
146   @Captor
147   private ArgumentCaptor<StreamObserver<LoadBalanceResponse>> lbResponseObserverCaptor;
148   private final FakeClock fakeClock = new FakeClock();
149   private final LinkedList<StreamObserver<LoadBalanceRequest>> lbRequestObservers =
150       new LinkedList<StreamObserver<LoadBalanceRequest>>();
151   private final LinkedList<Subchannel> mockSubchannels = new LinkedList<Subchannel>();
152   private final LinkedList<ManagedChannel> fakeOobChannels = new LinkedList<ManagedChannel>();
153   private final ArrayList<Subchannel> subchannelTracker = new ArrayList<>();
154   private final ArrayList<ManagedChannel> oobChannelTracker = new ArrayList<>();
155   private final ArrayList<String> failingLbAuthorities = new ArrayList<>();
156   private final TimeProvider timeProvider = new TimeProvider() {
157       @Override
158       public long currentTimeNanos() {
159         return fakeClock.getTicker().read();
160       }
161     };
162   private io.grpc.Server fakeLbServer;
163   @Captor
164   private ArgumentCaptor<SubchannelPicker> pickerCaptor;
165   private final SerializingExecutor channelExecutor =
166       new SerializingExecutor(MoreExecutors.directExecutor());
167   @Mock
168   private ObjectPool<ScheduledExecutorService> timerServicePool;
169   @Mock
170   private BackoffPolicy.Provider backoffPolicyProvider;
171   @Mock
172   private BackoffPolicy backoffPolicy1;
173   @Mock
174   private BackoffPolicy backoffPolicy2;
175   private GrpclbLoadBalancer balancer;
176 
177   @SuppressWarnings("unchecked")
178   @Before
setUp()179   public void setUp() throws Exception {
180     MockitoAnnotations.initMocks(this);
181     mockLbService = mock(LoadBalancerGrpc.LoadBalancerImplBase.class, delegatesTo(
182         new LoadBalancerGrpc.LoadBalancerImplBase() {
183           @Override
184           public StreamObserver<LoadBalanceRequest> balanceLoad(
185               final StreamObserver<LoadBalanceResponse> responseObserver) {
186             StreamObserver<LoadBalanceRequest> requestObserver =
187                 mock(StreamObserver.class);
188             Answer<Void> closeRpc = new Answer<Void>() {
189                 @Override
190                 public Void answer(InvocationOnMock invocation) {
191                   responseObserver.onCompleted();
192                   return null;
193                 }
194               };
195             doAnswer(closeRpc).when(requestObserver).onCompleted();
196             lbRequestObservers.add(requestObserver);
197             return requestObserver;
198           }
199         }));
200     fakeLbServer = InProcessServerBuilder.forName("fakeLb")
201         .directExecutor().addService(mockLbService).build().start();
202     doAnswer(new Answer<ManagedChannel>() {
203         @Override
204         public ManagedChannel answer(InvocationOnMock invocation) throws Throwable {
205           String authority = (String) invocation.getArguments()[1];
206           ManagedChannel channel;
207           if (failingLbAuthorities.contains(authority)) {
208             channel = InProcessChannelBuilder.forName("nonExistFakeLb").directExecutor()
209                 .overrideAuthority(authority).build();
210           } else {
211             channel = InProcessChannelBuilder.forName("fakeLb").directExecutor()
212                 .overrideAuthority(authority).build();
213           }
214           fakeOobChannels.add(channel);
215           oobChannelTracker.add(channel);
216           return channel;
217         }
218       }).when(helper).createOobChannel(any(EquivalentAddressGroup.class), any(String.class));
219     doAnswer(new Answer<Subchannel>() {
220         @Override
221         public Subchannel answer(InvocationOnMock invocation) throws Throwable {
222           Subchannel subchannel = mock(Subchannel.class);
223           EquivalentAddressGroup eag = (EquivalentAddressGroup) invocation.getArguments()[0];
224           Attributes attrs = (Attributes) invocation.getArguments()[1];
225           when(subchannel.getAddresses()).thenReturn(eag);
226           when(subchannel.getAttributes()).thenReturn(attrs);
227           mockSubchannels.add(subchannel);
228           subchannelTracker.add(subchannel);
229           return subchannel;
230         }
231       }).when(subchannelPool).takeOrCreateSubchannel(
232           any(EquivalentAddressGroup.class), any(Attributes.class));
233     doAnswer(new Answer<Void>() {
234         @Override
235         public Void answer(InvocationOnMock invocation) throws Throwable {
236           Runnable task = (Runnable) invocation.getArguments()[0];
237           channelExecutor.execute(task);
238           return null;
239         }
240       }).when(helper).runSerialized(any(Runnable.class));
241     doAnswer(new Answer<Void>() {
242         @Override
243         public Void answer(InvocationOnMock invocation) throws Throwable {
244           currentPicker = (SubchannelPicker) invocation.getArguments()[1];
245           return null;
246         }
247       }).when(helper).updateBalancingState(
248           any(ConnectivityState.class), any(SubchannelPicker.class));
249     when(helper.getAuthority()).thenReturn(SERVICE_AUTHORITY);
250     ScheduledExecutorService timerService = fakeClock.getScheduledExecutorService();
251     when(timerServicePool.getObject()).thenReturn(timerService);
252     when(backoffPolicy1.nextBackoffNanos()).thenReturn(10L, 100L);
253     when(backoffPolicy2.nextBackoffNanos()).thenReturn(10L, 100L);
254     when(backoffPolicyProvider.get()).thenReturn(backoffPolicy1, backoffPolicy2);
255     balancer = new GrpclbLoadBalancer(
256         helper,
257         subchannelPool,
258         timerServicePool,
259         timeProvider,
260         backoffPolicyProvider);
261     verify(subchannelPool).init(same(helper), same(timerService));
262   }
263 
264   @After
tearDown()265   public void tearDown() {
266     try {
267       if (balancer != null) {
268         channelExecutor.execute(new Runnable() {
269             @Override
270             public void run() {
271               balancer.shutdown();
272             }
273           });
274       }
275       for (ManagedChannel channel : oobChannelTracker) {
276         assertTrue(channel + " is shutdown", channel.isShutdown());
277         // balancer should have closed the LB stream, terminating the OOB channel.
278         assertTrue(channel + " is terminated", channel.isTerminated());
279       }
280       // GRPCLB manages subchannels only through subchannelPool
281       for (Subchannel subchannel: subchannelTracker) {
282         verify(subchannelPool).returnSubchannel(same(subchannel));
283         // Our mock subchannelPool never calls Subchannel.shutdown(), thus we can tell if
284         // LoadBalancer has called it expectedly.
285         verify(subchannel, never()).shutdown();
286       }
287       verify(helper, never())
288           .createSubchannel(any(EquivalentAddressGroup.class), any(Attributes.class));
289       // No timer should linger after shutdown
290       assertThat(fakeClock.getPendingTasks()).isEmpty();
291     } finally {
292       if (fakeLbServer != null) {
293         fakeLbServer.shutdownNow();
294       }
295     }
296   }
297 
298   @Test
roundRobinPickerNoDrop()299   public void roundRobinPickerNoDrop() {
300     GrpclbClientLoadRecorder loadRecorder = new GrpclbClientLoadRecorder(timeProvider);
301     Subchannel subchannel = mock(Subchannel.class);
302     BackendEntry b1 = new BackendEntry(subchannel, loadRecorder, "LBTOKEN0001");
303     BackendEntry b2 = new BackendEntry(subchannel, loadRecorder, "LBTOKEN0002");
304 
305     List<BackendEntry> pickList = Arrays.asList(b1, b2);
306     RoundRobinPicker picker = new RoundRobinPicker(Collections.<DropEntry>emptyList(), pickList);
307 
308     PickSubchannelArgs args1 = mock(PickSubchannelArgs.class);
309     Metadata headers1 = new Metadata();
310     // The existing token on the headers will be replaced
311     headers1.put(GrpclbConstants.TOKEN_METADATA_KEY, "LBTOKEN__OLD");
312     when(args1.getHeaders()).thenReturn(headers1);
313     assertSame(b1.result, picker.pickSubchannel(args1));
314     verify(args1).getHeaders();
315     assertThat(headers1.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0001");
316 
317     PickSubchannelArgs args2 = mock(PickSubchannelArgs.class);
318     Metadata headers2 = new Metadata();
319     when(args2.getHeaders()).thenReturn(headers2);
320     assertSame(b2.result, picker.pickSubchannel(args2));
321     verify(args2).getHeaders();
322     assertThat(headers2.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0002");
323 
324     PickSubchannelArgs args3 = mock(PickSubchannelArgs.class);
325     Metadata headers3 = new Metadata();
326     when(args3.getHeaders()).thenReturn(headers3);
327     assertSame(b1.result, picker.pickSubchannel(args3));
328     verify(args3).getHeaders();
329     assertThat(headers3.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0001");
330 
331     verify(subchannel, never()).getAttributes();
332   }
333 
334 
335   @Test
roundRobinPickerWithDrop()336   public void roundRobinPickerWithDrop() {
337     assertTrue(DROP_PICK_RESULT.isDrop());
338     GrpclbClientLoadRecorder loadRecorder = new GrpclbClientLoadRecorder(timeProvider);
339     Subchannel subchannel = mock(Subchannel.class);
340     // 1 out of 2 requests are to be dropped
341     DropEntry d = new DropEntry(loadRecorder, "LBTOKEN0003");
342     List<DropEntry> dropList = Arrays.asList(null, d);
343 
344     BackendEntry b1 = new BackendEntry(subchannel, loadRecorder, "LBTOKEN0001");
345     BackendEntry b2 = new BackendEntry(subchannel, loadRecorder, "LBTOKEN0002");
346     List<BackendEntry> pickList = Arrays.asList(b1, b2);
347     RoundRobinPicker picker = new RoundRobinPicker(dropList, pickList);
348 
349     // dropList[0], pickList[0]
350     PickSubchannelArgs args1 = mock(PickSubchannelArgs.class);
351     Metadata headers1 = new Metadata();
352     headers1.put(GrpclbConstants.TOKEN_METADATA_KEY, "LBTOKEN__OLD");
353     when(args1.getHeaders()).thenReturn(headers1);
354     assertSame(b1.result, picker.pickSubchannel(args1));
355     verify(args1).getHeaders();
356     assertThat(headers1.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0001");
357 
358     // dropList[1]: drop
359     PickSubchannelArgs args2 = mock(PickSubchannelArgs.class);
360     Metadata headers2 = new Metadata();
361     when(args2.getHeaders()).thenReturn(headers2);
362     assertSame(DROP_PICK_RESULT, picker.pickSubchannel(args2));
363     verify(args2, never()).getHeaders();
364 
365     // dropList[0], pickList[1]
366     PickSubchannelArgs args3 = mock(PickSubchannelArgs.class);
367     Metadata headers3 = new Metadata();
368     when(args3.getHeaders()).thenReturn(headers3);
369     assertSame(b2.result, picker.pickSubchannel(args3));
370     verify(args3).getHeaders();
371     assertThat(headers3.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0002");
372 
373     // dropList[1]: drop
374     PickSubchannelArgs args4 = mock(PickSubchannelArgs.class);
375     Metadata headers4 = new Metadata();
376     when(args4.getHeaders()).thenReturn(headers4);
377     assertSame(DROP_PICK_RESULT, picker.pickSubchannel(args4));
378     verify(args4, never()).getHeaders();
379 
380     // dropList[0], pickList[0]
381     PickSubchannelArgs args5 = mock(PickSubchannelArgs.class);
382     Metadata headers5 = new Metadata();
383     when(args5.getHeaders()).thenReturn(headers5);
384     assertSame(b1.result, picker.pickSubchannel(args5));
385     verify(args5).getHeaders();
386     assertThat(headers5.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0001");
387 
388     verify(subchannel, never()).getAttributes();
389   }
390 
391   @Test
loadReporting()392   public void loadReporting() {
393     Metadata headers = new Metadata();
394     PickSubchannelArgs args = mock(PickSubchannelArgs.class);
395     when(args.getHeaders()).thenReturn(headers);
396 
397     long loadReportIntervalMillis = 1983;
398     List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true);
399     Attributes grpclbResolutionAttrs = Attributes.EMPTY;
400     deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
401 
402     // Fallback timer is started as soon as address is resolved.
403     assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
404 
405     assertEquals(1, fakeOobChannels.size());
406     verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
407     StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
408     assertEquals(1, lbRequestObservers.size());
409     StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
410     InOrder inOrder = inOrder(lbRequestObserver);
411     InOrder helperInOrder = inOrder(helper, subchannelPool);
412 
413     inOrder.verify(lbRequestObserver).onNext(
414         eq(LoadBalanceRequest.newBuilder().setInitialRequest(
415                 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
416             .build()));
417 
418     // Simulate receiving LB response
419     assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
420     lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis));
421 
422     // Load reporting task is scheduled
423     assertEquals(1, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
424     assertEquals(0, fakeClock.runDueTasks());
425 
426     List<ServerEntry> backends = Arrays.asList(
427         new ServerEntry("127.0.0.1", 2000, "token0001"),
428         new ServerEntry("token0001"),  // drop
429         new ServerEntry("127.0.0.1", 2010, "token0002"),
430         new ServerEntry("token0003"));  // drop
431 
432     lbResponseObserver.onNext(buildLbResponse(backends));
433 
434     assertEquals(2, mockSubchannels.size());
435     Subchannel subchannel1 = mockSubchannels.poll();
436     Subchannel subchannel2 = mockSubchannels.poll();
437     deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING));
438     deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(CONNECTING));
439     deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY));
440     deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(READY));
441 
442     helperInOrder.verify(helper, atLeast(1))
443         .updateBalancingState(eq(READY), pickerCaptor.capture());
444     RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue();
445     assertThat(picker.dropList).containsExactly(
446         null,
447         new DropEntry(getLoadRecorder(), "token0001"),
448         null,
449         new DropEntry(getLoadRecorder(), "token0003")).inOrder();
450     assertThat(picker.pickList).containsExactly(
451         new BackendEntry(subchannel1, getLoadRecorder(), "token0001"),
452         new BackendEntry(subchannel2, getLoadRecorder(), "token0002")).inOrder();
453 
454     // Report, no data
455     assertNextReport(
456         inOrder, lbRequestObserver, loadReportIntervalMillis,
457         ClientStats.newBuilder().build());
458 
459     PickResult pick1 = picker.pickSubchannel(args);
460     assertSame(subchannel1, pick1.getSubchannel());
461     assertSame(getLoadRecorder(), pick1.getStreamTracerFactory());
462 
463     // Merely the pick will not be recorded as upstart.
464     assertNextReport(
465         inOrder, lbRequestObserver, loadReportIntervalMillis,
466         ClientStats.newBuilder().build());
467 
468     ClientStreamTracer tracer1 =
469         pick1.getStreamTracerFactory().newClientStreamTracer(CallOptions.DEFAULT, new Metadata());
470 
471     PickResult pick2 = picker.pickSubchannel(args);
472     assertNull(pick2.getSubchannel());
473     assertSame(DROP_PICK_RESULT, pick2);
474 
475     // Report includes upstart of pick1 and the drop of pick2
476     assertNextReport(
477         inOrder, lbRequestObserver, loadReportIntervalMillis,
478         ClientStats.newBuilder()
479             .setNumCallsStarted(2)
480             .setNumCallsFinished(1)  // pick2
481             .addCallsFinishedWithDrop(
482                 ClientStatsPerToken.newBuilder()
483                     .setLoadBalanceToken("token0001")
484                     .setNumCalls(1)          // pick2
485                     .build())
486             .build());
487 
488     PickResult pick3 = picker.pickSubchannel(args);
489     assertSame(subchannel2, pick3.getSubchannel());
490     assertSame(getLoadRecorder(), pick3.getStreamTracerFactory());
491     ClientStreamTracer tracer3 =
492         pick3.getStreamTracerFactory().newClientStreamTracer(CallOptions.DEFAULT, new Metadata());
493 
494     // pick3 has sent out headers
495     tracer3.outboundHeaders();
496 
497     // 3rd report includes pick3's upstart
498     assertNextReport(
499         inOrder, lbRequestObserver, loadReportIntervalMillis,
500         ClientStats.newBuilder()
501             .setNumCallsStarted(1)
502             .build());
503 
504     PickResult pick4 = picker.pickSubchannel(args);
505     assertNull(pick4.getSubchannel());
506     assertSame(DROP_PICK_RESULT, pick4);
507 
508     // pick1 ended without sending anything
509     tracer1.streamClosed(Status.CANCELLED);
510 
511     // 4th report includes end of pick1 and drop of pick4
512     assertNextReport(
513         inOrder, lbRequestObserver, loadReportIntervalMillis,
514         ClientStats.newBuilder()
515             .setNumCallsStarted(1)  // pick4
516             .setNumCallsFinished(2)
517             .setNumCallsFinishedWithClientFailedToSend(1)   // pick1
518             .addCallsFinishedWithDrop(
519                 ClientStatsPerToken.newBuilder()
520                     .setLoadBalanceToken("token0003")
521                     .setNumCalls(1)   // pick4
522                     .build())
523         .build());
524 
525     PickResult pick5 = picker.pickSubchannel(args);
526     assertSame(subchannel1, pick1.getSubchannel());
527     assertSame(getLoadRecorder(), pick5.getStreamTracerFactory());
528     ClientStreamTracer tracer5 =
529         pick5.getStreamTracerFactory().newClientStreamTracer(CallOptions.DEFAULT, new Metadata());
530 
531     // pick3 ended without receiving response headers
532     tracer3.streamClosed(Status.DEADLINE_EXCEEDED);
533 
534     // pick5 sent and received headers
535     tracer5.outboundHeaders();
536     tracer5.inboundHeaders();
537 
538     // 5th report includes pick3's end and pick5's upstart
539     assertNextReport(
540         inOrder, lbRequestObserver, loadReportIntervalMillis,
541         ClientStats.newBuilder()
542             .setNumCallsStarted(1)  // pick5
543             .setNumCallsFinished(1)  // pick3
544             .build());
545 
546     // pick5 ends
547     tracer5.streamClosed(Status.OK);
548 
549     // 6th report includes pick5's end
550     assertNextReport(
551         inOrder, lbRequestObserver, loadReportIntervalMillis,
552         ClientStats.newBuilder()
553             .setNumCallsFinished(1)
554             .setNumCallsFinishedKnownReceived(1)
555             .build());
556 
557     assertEquals(1, fakeClock.numPendingTasks());
558     // Balancer closes the stream, scheduled reporting task cancelled
559     lbResponseObserver.onError(Status.UNAVAILABLE.asException());
560     assertEquals(0, fakeClock.numPendingTasks());
561 
562     // New stream created
563     verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture());
564     lbResponseObserver = lbResponseObserverCaptor.getValue();
565     assertEquals(1, lbRequestObservers.size());
566     lbRequestObserver = lbRequestObservers.poll();
567     inOrder = inOrder(lbRequestObserver);
568 
569     inOrder.verify(lbRequestObserver).onNext(
570         eq(LoadBalanceRequest.newBuilder().setInitialRequest(
571                 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
572             .build()));
573 
574     // Load reporting is also requested
575     lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis));
576 
577     // No picker created because balancer is still using the results from the last stream
578     helperInOrder.verify(helper, never())
579         .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
580 
581     // Make a new pick on that picker.  It will not show up on the report of the new stream, because
582     // that picker is associated with the previous stream.
583     PickResult pick6 = picker.pickSubchannel(args);
584     assertNull(pick6.getSubchannel());
585     assertSame(DROP_PICK_RESULT, pick6);
586     assertNextReport(
587         inOrder, lbRequestObserver, loadReportIntervalMillis,
588         ClientStats.newBuilder().build());
589 
590     // New stream got the list update
591     lbResponseObserver.onNext(buildLbResponse(backends));
592 
593     // Same backends, thus no new subchannels
594     helperInOrder.verify(subchannelPool, never()).takeOrCreateSubchannel(
595         any(EquivalentAddressGroup.class), any(Attributes.class));
596     // But the new RoundRobinEntries have a new loadRecorder, thus considered different from
597     // the previous list, thus a new picker is created
598     helperInOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
599     picker = (RoundRobinPicker) pickerCaptor.getValue();
600 
601     PickResult pick1p = picker.pickSubchannel(args);
602     assertSame(subchannel1, pick1p.getSubchannel());
603     assertSame(getLoadRecorder(), pick1p.getStreamTracerFactory());
604     pick1p.getStreamTracerFactory().newClientStreamTracer(CallOptions.DEFAULT, new Metadata());
605 
606     // The pick from the new stream will be included in the report
607     assertNextReport(
608         inOrder, lbRequestObserver, loadReportIntervalMillis,
609         ClientStats.newBuilder()
610             .setNumCallsStarted(1)
611             .build());
612 
613     verify(args, atLeast(0)).getHeaders();
614     verifyNoMoreInteractions(args);
615   }
616 
617   @Test
abundantInitialResponse()618   public void abundantInitialResponse() {
619     Metadata headers = new Metadata();
620     PickSubchannelArgs args = mock(PickSubchannelArgs.class);
621     when(args.getHeaders()).thenReturn(headers);
622 
623     List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true);
624     Attributes grpclbResolutionAttrs = Attributes.EMPTY;
625     deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
626     assertEquals(1, fakeOobChannels.size());
627     verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
628     StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
629 
630     // Simulate LB initial response
631     assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
632     lbResponseObserver.onNext(buildInitialResponse(1983));
633 
634     // Load reporting task is scheduled
635     assertEquals(1, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
636     FakeClock.ScheduledTask scheduledTask = fakeClock.getPendingTasks().iterator().next();
637     assertEquals(1983, scheduledTask.getDelay(TimeUnit.MILLISECONDS));
638 
639     // Simulate an abundant LB initial response, with a different report interval
640     lbResponseObserver.onNext(buildInitialResponse(9097));
641     // It doesn't affect load-reporting at all
642     assertThat(fakeClock.getPendingTasks(LOAD_REPORTING_TASK_FILTER))
643         .containsExactly(scheduledTask);
644     assertEquals(1983, scheduledTask.getDelay(TimeUnit.MILLISECONDS));
645   }
646 
647   @Test
raceBetweenLoadReportingAndLbStreamClosure()648   public void raceBetweenLoadReportingAndLbStreamClosure() {
649     Metadata headers = new Metadata();
650     PickSubchannelArgs args = mock(PickSubchannelArgs.class);
651     when(args.getHeaders()).thenReturn(headers);
652 
653     List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true);
654     Attributes grpclbResolutionAttrs = Attributes.EMPTY;
655     deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
656     assertEquals(1, fakeOobChannels.size());
657     verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
658     StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
659     assertEquals(1, lbRequestObservers.size());
660     StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
661     InOrder inOrder = inOrder(lbRequestObserver);
662 
663     inOrder.verify(lbRequestObserver).onNext(
664         eq(LoadBalanceRequest.newBuilder().setInitialRequest(
665                 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
666             .build()));
667 
668     // Simulate receiving LB response
669     assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
670     lbResponseObserver.onNext(buildInitialResponse(1983));
671 
672     // Load reporting task is scheduled
673     assertEquals(1, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
674     FakeClock.ScheduledTask scheduledTask = fakeClock.getPendingTasks().iterator().next();
675     assertEquals(1983, scheduledTask.getDelay(TimeUnit.MILLISECONDS));
676 
677     // Close lbStream
678     lbResponseObserver.onCompleted();
679 
680     // Reporting task cancelled
681     assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
682 
683     // Simulate a race condition where the task has just started when its cancelled
684     scheduledTask.command.run();
685 
686     // No report sent. No new task scheduled
687     inOrder.verify(lbRequestObserver, never()).onNext(any(LoadBalanceRequest.class));
688     assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
689   }
690 
assertNextReport( InOrder inOrder, StreamObserver<LoadBalanceRequest> lbRequestObserver, long loadReportIntervalMillis, ClientStats expectedReport)691   private void assertNextReport(
692       InOrder inOrder, StreamObserver<LoadBalanceRequest> lbRequestObserver,
693       long loadReportIntervalMillis, ClientStats expectedReport) {
694     assertEquals(0, fakeClock.forwardTime(loadReportIntervalMillis - 1, TimeUnit.MILLISECONDS));
695     inOrder.verifyNoMoreInteractions();
696     assertEquals(1, fakeClock.forwardTime(1, TimeUnit.MILLISECONDS));
697     assertEquals(1, fakeClock.numPendingTasks());
698     inOrder.verify(lbRequestObserver).onNext(
699         eq(LoadBalanceRequest.newBuilder()
700             .setClientStats(
701                 ClientStats.newBuilder(expectedReport)
702                 .setTimestamp(Timestamps.fromNanos(fakeClock.getTicker().read()))
703                 .build())
704             .build()));
705   }
706 
707   @Test
acquireAndReleaseScheduledExecutor()708   public void acquireAndReleaseScheduledExecutor() {
709     verify(timerServicePool).getObject();
710     verifyNoMoreInteractions(timerServicePool);
711 
712     balancer.shutdown();
713     verify(timerServicePool).returnObject(same(fakeClock.getScheduledExecutorService()));
714     verifyNoMoreInteractions(timerServicePool);
715   }
716 
717   @Test
nameResolutionFailsThenRecoverToDelegate()718   public void nameResolutionFailsThenRecoverToDelegate() {
719     Status error = Status.NOT_FOUND.withDescription("www.google.com not found");
720     deliverNameResolutionError(error);
721     verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
722     RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue();
723     assertThat(picker.dropList).isEmpty();
724     assertThat(picker.pickList).containsExactly(new ErrorEntry(error));
725 
726     // Recover with a subsequent success
727     List<EquivalentAddressGroup> resolvedServers = createResolvedServerAddresses(false);
728 
729     Attributes resolutionAttrs = Attributes.newBuilder().set(RESOLUTION_ATTR, "yeah").build();
730     deliverResolvedAddresses(resolvedServers, resolutionAttrs);
731   }
732 
733   @Test
nameResolutionFailsThenRecoverToGrpclb()734   public void nameResolutionFailsThenRecoverToGrpclb() {
735     Status error = Status.NOT_FOUND.withDescription("www.google.com not found");
736     deliverNameResolutionError(error);
737     verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
738     RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue();
739     assertThat(picker.dropList).isEmpty();
740     assertThat(picker.pickList).containsExactly(new ErrorEntry(error));
741 
742     // Recover with a subsequent success
743     List<EquivalentAddressGroup> resolvedServers = createResolvedServerAddresses(true);
744     EquivalentAddressGroup eag = resolvedServers.get(0);
745 
746     Attributes resolutionAttrs = Attributes.EMPTY;
747     deliverResolvedAddresses(resolvedServers, resolutionAttrs);
748 
749     verify(helper).createOobChannel(eq(eag), eq(lbAuthority(0)));
750     verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
751   }
752 
753   @Test
grpclbThenNameResolutionFails()754   public void grpclbThenNameResolutionFails() {
755     InOrder inOrder = inOrder(helper, subchannelPool);
756     // Go to GRPCLB first
757     List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true);
758     Attributes grpclbResolutionAttrs = Attributes.EMPTY;
759     deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
760 
761     verify(helper).createOobChannel(eq(grpclbResolutionList.get(0)), eq(lbAuthority(0)));
762     assertEquals(1, fakeOobChannels.size());
763     ManagedChannel oobChannel = fakeOobChannels.poll();
764     verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
765     StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
766 
767     // Let name resolution fail before round-robin list is ready
768     Status error = Status.NOT_FOUND.withDescription("www.google.com not found");
769     deliverNameResolutionError(error);
770 
771     inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
772     RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue();
773     assertThat(picker.dropList).isEmpty();
774     assertThat(picker.pickList).containsExactly(new ErrorEntry(error));
775     assertFalse(oobChannel.isShutdown());
776 
777     // Simulate receiving LB response
778     List<ServerEntry> backends = Arrays.asList(
779         new ServerEntry("127.0.0.1", 2000, "TOKEN1"),
780         new ServerEntry("127.0.0.1", 2010, "TOKEN2"));
781     verify(helper, never()).runSerialized(any(Runnable.class));
782     lbResponseObserver.onNext(buildInitialResponse());
783     lbResponseObserver.onNext(buildLbResponse(backends));
784 
785     verify(helper, times(2)).runSerialized(any(Runnable.class));
786     inOrder.verify(subchannelPool).takeOrCreateSubchannel(
787         eq(new EquivalentAddressGroup(backends.get(0).addr, LB_BACKEND_ATTRS)),
788         any(Attributes.class));
789     inOrder.verify(subchannelPool).takeOrCreateSubchannel(
790         eq(new EquivalentAddressGroup(backends.get(1).addr, LB_BACKEND_ATTRS)),
791         any(Attributes.class));
792   }
793 
794   @Test
grpclbUpdatedAddresses_avoidsReconnect()795   public void grpclbUpdatedAddresses_avoidsReconnect() {
796     List<EquivalentAddressGroup> grpclbResolutionList =
797         createResolvedServerAddresses(true, false);
798     Attributes grpclbResolutionAttrs = Attributes.EMPTY;
799     deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
800 
801     verify(helper).createOobChannel(eq(grpclbResolutionList.get(0)), eq(lbAuthority(0)));
802     ManagedChannel oobChannel = fakeOobChannels.poll();
803     assertEquals(1, lbRequestObservers.size());
804 
805     List<EquivalentAddressGroup> grpclbResolutionList2 =
806         createResolvedServerAddresses(true, false, true);
807     EquivalentAddressGroup combinedEag = new EquivalentAddressGroup(Arrays.asList(
808         grpclbResolutionList2.get(0).getAddresses().get(0),
809         grpclbResolutionList2.get(2).getAddresses().get(0)),
810         lbAttributes(lbAuthority(0)));
811     deliverResolvedAddresses(grpclbResolutionList2, grpclbResolutionAttrs);
812     verify(helper).updateOobChannelAddresses(eq(oobChannel), eq(combinedEag));
813     assertEquals(1, lbRequestObservers.size()); // No additional RPC
814   }
815 
816   @Test
grpclbUpdatedAddresses_reconnectOnAuthorityChange()817   public void grpclbUpdatedAddresses_reconnectOnAuthorityChange() {
818     List<EquivalentAddressGroup> grpclbResolutionList =
819         createResolvedServerAddresses(true, false);
820     Attributes grpclbResolutionAttrs = Attributes.EMPTY;
821     deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
822 
823     verify(helper).createOobChannel(eq(grpclbResolutionList.get(0)), eq(lbAuthority(0)));
824     ManagedChannel oobChannel = fakeOobChannels.poll();
825     assertEquals(1, lbRequestObservers.size());
826 
827     final String newAuthority = "some-new-authority";
828     List<EquivalentAddressGroup> grpclbResolutionList2 =
829         createResolvedServerAddresses(false);
830     grpclbResolutionList2.add(new EquivalentAddressGroup(
831         new FakeSocketAddress("somethingNew"), lbAttributes(newAuthority)));
832     deliverResolvedAddresses(grpclbResolutionList2, grpclbResolutionAttrs);
833     assertTrue(oobChannel.isTerminated());
834     verify(helper).createOobChannel(eq(grpclbResolutionList2.get(1)), eq(newAuthority));
835     assertEquals(2, lbRequestObservers.size()); // An additional RPC
836   }
837 
838   @Test
grpclbWorking()839   public void grpclbWorking() {
840     InOrder inOrder = inOrder(helper, subchannelPool);
841     List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true);
842     Attributes grpclbResolutionAttrs = Attributes.EMPTY;
843     deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
844 
845     // Fallback timer is started as soon as the addresses are resolved.
846     assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
847 
848     verify(helper).createOobChannel(eq(grpclbResolutionList.get(0)), eq(lbAuthority(0)));
849     assertEquals(1, fakeOobChannels.size());
850     ManagedChannel oobChannel = fakeOobChannels.poll();
851     verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
852     StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
853     assertEquals(1, lbRequestObservers.size());
854     StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
855     verify(lbRequestObserver).onNext(
856         eq(LoadBalanceRequest.newBuilder().setInitialRequest(
857                 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
858             .build()));
859 
860     // Simulate receiving LB response
861     List<ServerEntry> backends1 = Arrays.asList(
862         new ServerEntry("127.0.0.1", 2000, "token0001"),
863         new ServerEntry("127.0.0.1", 2010, "token0002"));
864     inOrder.verify(helper, never())
865         .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
866     lbResponseObserver.onNext(buildInitialResponse());
867     lbResponseObserver.onNext(buildLbResponse(backends1));
868 
869     inOrder.verify(subchannelPool).takeOrCreateSubchannel(
870         eq(new EquivalentAddressGroup(backends1.get(0).addr, LB_BACKEND_ATTRS)),
871         any(Attributes.class));
872     inOrder.verify(subchannelPool).takeOrCreateSubchannel(
873         eq(new EquivalentAddressGroup(backends1.get(1).addr, LB_BACKEND_ATTRS)),
874         any(Attributes.class));
875     assertEquals(2, mockSubchannels.size());
876     Subchannel subchannel1 = mockSubchannels.poll();
877     Subchannel subchannel2 = mockSubchannels.poll();
878     verify(subchannel1).requestConnection();
879     verify(subchannel2).requestConnection();
880     assertEquals(
881         new EquivalentAddressGroup(backends1.get(0).addr, LB_BACKEND_ATTRS),
882         subchannel1.getAddresses());
883     assertEquals(
884         new EquivalentAddressGroup(backends1.get(1).addr, LB_BACKEND_ATTRS),
885         subchannel2.getAddresses());
886 
887     deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING));
888     deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(CONNECTING));
889     inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
890     RoundRobinPicker picker0 = (RoundRobinPicker) pickerCaptor.getValue();
891     assertThat(picker0.dropList).containsExactly(null, null);
892     assertThat(picker0.pickList).containsExactly(BUFFER_ENTRY);
893     inOrder.verifyNoMoreInteractions();
894 
895     // Let subchannels be connected
896     deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(READY));
897     inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
898     RoundRobinPicker picker1 = (RoundRobinPicker) pickerCaptor.getValue();
899 
900     assertThat(picker1.dropList).containsExactly(null, null);
901     assertThat(picker1.pickList).containsExactly(
902         new BackendEntry(subchannel2, getLoadRecorder(), "token0002"));
903 
904     deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY));
905     inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
906     RoundRobinPicker picker2 = (RoundRobinPicker) pickerCaptor.getValue();
907     assertThat(picker2.dropList).containsExactly(null, null);
908     assertThat(picker2.pickList).containsExactly(
909         new BackendEntry(subchannel1, getLoadRecorder(), "token0001"),
910         new BackendEntry(subchannel2, getLoadRecorder(), "token0002"))
911         .inOrder();
912 
913     // Disconnected subchannels
914     verify(subchannel1).requestConnection();
915     deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(IDLE));
916     verify(subchannel1, times(2)).requestConnection();
917     inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
918     RoundRobinPicker picker3 = (RoundRobinPicker) pickerCaptor.getValue();
919     assertThat(picker3.dropList).containsExactly(null, null);
920     assertThat(picker3.pickList).containsExactly(
921         new BackendEntry(subchannel2, getLoadRecorder(), "token0002"));
922 
923     deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING));
924     inOrder.verifyNoMoreInteractions();
925 
926     // As long as there is at least one READY subchannel, round robin will work.
927     Status error1 = Status.UNAVAILABLE.withDescription("error1");
928     deliverSubchannelState(subchannel1, ConnectivityStateInfo.forTransientFailure(error1));
929     inOrder.verifyNoMoreInteractions();
930 
931     // If no subchannel is READY, some with error and the others are IDLE, will report CONNECTING
932     verify(subchannel2).requestConnection();
933     deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(IDLE));
934     verify(subchannel2, times(2)).requestConnection();
935     inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
936     RoundRobinPicker picker4 = (RoundRobinPicker) pickerCaptor.getValue();
937     assertThat(picker4.dropList).containsExactly(null, null);
938     assertThat(picker4.pickList).containsExactly(BUFFER_ENTRY);
939 
940     // Update backends, with a drop entry
941     List<ServerEntry> backends2 =
942         Arrays.asList(
943             new ServerEntry("127.0.0.1", 2030, "token0003"),  // New address
944             new ServerEntry("token0003"),  // drop
945             new ServerEntry("127.0.0.1", 2010, "token0004"),  // Existing address with token changed
946             new ServerEntry("127.0.0.1", 2030, "token0005"),  // New address appearing second time
947             new ServerEntry("token0006"));  // drop
948     verify(subchannelPool, never()).returnSubchannel(same(subchannel1));
949 
950     lbResponseObserver.onNext(buildLbResponse(backends2));
951     // not in backends2, closed
952     verify(subchannelPool).returnSubchannel(same(subchannel1));
953     // backends2[2], will be kept
954     verify(subchannelPool, never()).returnSubchannel(same(subchannel2));
955 
956     inOrder.verify(subchannelPool, never()).takeOrCreateSubchannel(
957         eq(new EquivalentAddressGroup(backends2.get(2).addr, LB_BACKEND_ATTRS)),
958         any(Attributes.class));
959     inOrder.verify(subchannelPool).takeOrCreateSubchannel(
960         eq(new EquivalentAddressGroup(backends2.get(0).addr, LB_BACKEND_ATTRS)),
961         any(Attributes.class));
962     assertEquals(1, mockSubchannels.size());
963     Subchannel subchannel3 = mockSubchannels.poll();
964     verify(subchannel3).requestConnection();
965     assertEquals(
966         new EquivalentAddressGroup(backends2.get(0).addr, LB_BACKEND_ATTRS),
967         subchannel3.getAddresses());
968     inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
969     RoundRobinPicker picker7 = (RoundRobinPicker) pickerCaptor.getValue();
970     assertThat(picker7.dropList).containsExactly(
971         null,
972         new DropEntry(getLoadRecorder(), "token0003"),
973         null,
974         null,
975         new DropEntry(getLoadRecorder(), "token0006")).inOrder();
976     assertThat(picker7.pickList).containsExactly(BUFFER_ENTRY);
977 
978     // State updates on obsolete subchannel1 will have no effect
979     deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY));
980     deliverSubchannelState(
981         subchannel1, ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE));
982     deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(SHUTDOWN));
983     inOrder.verifyNoMoreInteractions();
984 
985     deliverSubchannelState(subchannel3, ConnectivityStateInfo.forNonError(READY));
986     inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
987     RoundRobinPicker picker8 = (RoundRobinPicker) pickerCaptor.getValue();
988     assertThat(picker8.dropList).containsExactly(
989         null,
990         new DropEntry(getLoadRecorder(), "token0003"),
991         null,
992         null,
993         new DropEntry(getLoadRecorder(), "token0006")).inOrder();
994     // subchannel2 is still IDLE, thus not in the active list
995     assertThat(picker8.pickList).containsExactly(
996         new BackendEntry(subchannel3, getLoadRecorder(), "token0003"),
997         new BackendEntry(subchannel3, getLoadRecorder(), "token0005")).inOrder();
998     // subchannel2 becomes READY and makes it into the list
999     deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(READY));
1000     inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
1001     RoundRobinPicker picker9 = (RoundRobinPicker) pickerCaptor.getValue();
1002     assertThat(picker9.dropList).containsExactly(
1003         null,
1004         new DropEntry(getLoadRecorder(), "token0003"),
1005         null,
1006         null,
1007         new DropEntry(getLoadRecorder(), "token0006")).inOrder();
1008     assertThat(picker9.pickList).containsExactly(
1009         new BackendEntry(subchannel3, getLoadRecorder(), "token0003"),
1010         new BackendEntry(subchannel2, getLoadRecorder(), "token0004"),
1011         new BackendEntry(subchannel3, getLoadRecorder(), "token0005")).inOrder();
1012     verify(subchannelPool, never()).returnSubchannel(same(subchannel3));
1013 
1014     // Update backends, with no entry
1015     lbResponseObserver.onNext(buildLbResponse(Collections.<ServerEntry>emptyList()));
1016     verify(subchannelPool).returnSubchannel(same(subchannel2));
1017     verify(subchannelPool).returnSubchannel(same(subchannel3));
1018     inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
1019     RoundRobinPicker picker10 = (RoundRobinPicker) pickerCaptor.getValue();
1020     assertThat(picker10.dropList).isEmpty();
1021     assertThat(picker10.pickList).containsExactly(BUFFER_ENTRY);
1022 
1023     assertFalse(oobChannel.isShutdown());
1024     assertEquals(0, lbRequestObservers.size());
1025     verify(lbRequestObserver, never()).onCompleted();
1026     verify(lbRequestObserver, never()).onError(any(Throwable.class));
1027 
1028     // Load reporting was not requested, thus never scheduled
1029     assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
1030 
1031     verify(subchannelPool, never()).clear();
1032     balancer.shutdown();
1033     verify(subchannelPool).clear();
1034   }
1035 
1036   @Test
grpclbFallback_initialTimeout_serverListReceivedBeforeTimerExpires()1037   public void grpclbFallback_initialTimeout_serverListReceivedBeforeTimerExpires() {
1038     subtestGrpclbFallbackInitialTimeout(false);
1039   }
1040 
1041   @Test
grpclbFallback_initialTimeout_timerExpires()1042   public void grpclbFallback_initialTimeout_timerExpires() {
1043     subtestGrpclbFallbackInitialTimeout(true);
1044   }
1045 
1046   // Fallback or not within the period of the initial timeout.
subtestGrpclbFallbackInitialTimeout(boolean timerExpires)1047   private void subtestGrpclbFallbackInitialTimeout(boolean timerExpires) {
1048     long loadReportIntervalMillis = 1983;
1049     InOrder inOrder = inOrder(helper, subchannelPool);
1050 
1051     // Create a resolution list with a mixture of balancer and backend addresses
1052     List<EquivalentAddressGroup> resolutionList =
1053         createResolvedServerAddresses(false, true, false);
1054     Attributes resolutionAttrs = Attributes.EMPTY;
1055     deliverResolvedAddresses(resolutionList, resolutionAttrs);
1056 
1057     inOrder.verify(helper).createOobChannel(eq(resolutionList.get(1)), eq(lbAuthority(0)));
1058 
1059     // Attempted to connect to balancer
1060     assertEquals(1, fakeOobChannels.size());
1061     ManagedChannel oobChannel = fakeOobChannels.poll();
1062     verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
1063     StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
1064     assertEquals(1, lbRequestObservers.size());
1065     StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
1066 
1067     verify(lbRequestObserver).onNext(
1068         eq(LoadBalanceRequest.newBuilder().setInitialRequest(
1069                 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
1070             .build()));
1071     lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis));
1072     // We don't care if runSerialized() has been run.
1073     inOrder.verify(helper, atLeast(0)).runSerialized(any(Runnable.class));
1074     inOrder.verifyNoMoreInteractions();
1075 
1076     assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
1077     fakeClock.forwardTime(GrpclbState.FALLBACK_TIMEOUT_MS - 1, TimeUnit.MILLISECONDS);
1078     assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
1079 
1080     /////////////////////////////////////////////
1081     // Break the LB stream before timer expires
1082     /////////////////////////////////////////////
1083     Status streamError = Status.UNAVAILABLE.withDescription("OOB stream broken");
1084     lbResponseObserver.onError(streamError.asException());
1085     // Not in fallback mode. The error will be propagated.
1086     verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
1087     RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue();
1088     assertThat(picker.dropList).isEmpty();
1089     ErrorEntry errorEntry = (ErrorEntry) Iterables.getOnlyElement(picker.pickList);
1090     Status status = errorEntry.result.getStatus();
1091     assertThat(status.getCode()).isEqualTo(streamError.getCode());
1092     assertThat(status.getDescription()).contains(streamError.getDescription());
1093     // A new stream is created
1094     verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture());
1095     lbResponseObserver = lbResponseObserverCaptor.getValue();
1096     assertEquals(1, lbRequestObservers.size());
1097     lbRequestObserver = lbRequestObservers.poll();
1098     verify(lbRequestObserver).onNext(
1099         eq(LoadBalanceRequest.newBuilder().setInitialRequest(
1100                 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
1101             .build()));
1102 
1103     //////////////////////////////////
1104     // Fallback timer expires (or not)
1105     //////////////////////////////////
1106     if (timerExpires) {
1107       fakeClock.forwardTime(1, TimeUnit.MILLISECONDS);
1108       assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
1109       // Fall back to the backends from resolver
1110       fallbackTestVerifyUseOfFallbackBackendLists(
1111           inOrder, Arrays.asList(resolutionList.get(0), resolutionList.get(2)));
1112 
1113       assertFalse(oobChannel.isShutdown());
1114       verify(lbRequestObserver, never()).onCompleted();
1115     }
1116 
1117     ////////////////////////////////////////////////////////
1118     // Name resolver sends new list without any backend addr
1119     ////////////////////////////////////////////////////////
1120     resolutionList = createResolvedServerAddresses(true, true);
1121     deliverResolvedAddresses(resolutionList, resolutionAttrs);
1122 
1123     // New addresses are updated to the OobChannel
1124     inOrder.verify(helper).updateOobChannelAddresses(
1125         same(oobChannel),
1126         eq(new EquivalentAddressGroup(
1127                 Arrays.asList(
1128                     resolutionList.get(0).getAddresses().get(0),
1129                     resolutionList.get(1).getAddresses().get(0)),
1130                 lbAttributes(lbAuthority(0)))));
1131 
1132     if (timerExpires) {
1133       // Still in fallback logic, except that the backend list is empty
1134       fallbackTestVerifyUseOfFallbackBackendLists(
1135           inOrder, Collections.<EquivalentAddressGroup>emptyList());
1136     }
1137 
1138     //////////////////////////////////////////////////
1139     // Name resolver sends new list with backend addrs
1140     //////////////////////////////////////////////////
1141     resolutionList = createResolvedServerAddresses(true, false, false);
1142     deliverResolvedAddresses(resolutionList, resolutionAttrs);
1143 
1144     // New LB address is updated to the OobChannel
1145     inOrder.verify(helper).updateOobChannelAddresses(
1146         same(oobChannel),
1147         eq(resolutionList.get(0)));
1148 
1149     if (timerExpires) {
1150       // New backend addresses are used for fallback
1151       fallbackTestVerifyUseOfFallbackBackendLists(
1152           inOrder, Arrays.asList(resolutionList.get(1), resolutionList.get(2)));
1153     }
1154 
1155     ////////////////////////////////////////////////
1156     // Break the LB stream after the timer expires
1157     ////////////////////////////////////////////////
1158     if (timerExpires) {
1159       lbResponseObserver.onError(streamError.asException());
1160 
1161       // The error will NOT propagate to picker because fallback list is in use.
1162       inOrder.verify(helper, never())
1163           .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
1164       // A new stream is created
1165       verify(mockLbService, times(3)).balanceLoad(lbResponseObserverCaptor.capture());
1166       lbResponseObserver = lbResponseObserverCaptor.getValue();
1167       assertEquals(1, lbRequestObservers.size());
1168       lbRequestObserver = lbRequestObservers.poll();
1169       verify(lbRequestObserver).onNext(
1170           eq(LoadBalanceRequest.newBuilder().setInitialRequest(
1171                   InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
1172               .build()));
1173     }
1174 
1175     /////////////////////////////////
1176     // Balancer returns a server list
1177     /////////////////////////////////
1178     List<ServerEntry> serverList = Arrays.asList(
1179         new ServerEntry("127.0.0.1", 2000, "token0001"),
1180         new ServerEntry("127.0.0.1", 2010, "token0002"));
1181     lbResponseObserver.onNext(buildInitialResponse());
1182     lbResponseObserver.onNext(buildLbResponse(serverList));
1183 
1184     // Balancer-provided server list now in effect
1185     fallbackTestVerifyUseOfBalancerBackendLists(inOrder, serverList);
1186 
1187     ///////////////////////////////////////////////////////////////
1188     // New backend addresses from resolver outside of fallback mode
1189     ///////////////////////////////////////////////////////////////
1190     resolutionList = createResolvedServerAddresses(true, false);
1191     deliverResolvedAddresses(resolutionList, resolutionAttrs);
1192     // Will not affect the round robin list at all
1193     inOrder.verify(helper, never())
1194         .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
1195 
1196     // No fallback timeout timer scheduled.
1197     assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
1198   }
1199 
1200   @Test
grpclbFallback_balancerLost()1201   public void grpclbFallback_balancerLost() {
1202     subtestGrpclbFallbackConnectionLost(true, false);
1203   }
1204 
1205   @Test
grpclbFallback_subchannelsLost()1206   public void grpclbFallback_subchannelsLost() {
1207     subtestGrpclbFallbackConnectionLost(false, true);
1208   }
1209 
1210   @Test
grpclbFallback_allLost()1211   public void grpclbFallback_allLost() {
1212     subtestGrpclbFallbackConnectionLost(true, true);
1213   }
1214 
1215   // Fallback outside of the initial timeout, where all connections are lost.
subtestGrpclbFallbackConnectionLost( boolean balancerBroken, boolean allSubchannelsBroken)1216   private void subtestGrpclbFallbackConnectionLost(
1217       boolean balancerBroken, boolean allSubchannelsBroken) {
1218     long loadReportIntervalMillis = 1983;
1219     InOrder inOrder = inOrder(helper, mockLbService, subchannelPool);
1220 
1221     // Create a resolution list with a mixture of balancer and backend addresses
1222     List<EquivalentAddressGroup> resolutionList =
1223         createResolvedServerAddresses(false, true, false);
1224     Attributes resolutionAttrs = Attributes.EMPTY;
1225     deliverResolvedAddresses(resolutionList, resolutionAttrs);
1226 
1227     inOrder.verify(helper).createOobChannel(eq(resolutionList.get(1)), eq(lbAuthority(0)));
1228 
1229     // Attempted to connect to balancer
1230     assertEquals(1, fakeOobChannels.size());
1231     fakeOobChannels.poll();
1232     inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
1233     StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
1234     assertEquals(1, lbRequestObservers.size());
1235     StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
1236 
1237     verify(lbRequestObserver).onNext(
1238         eq(LoadBalanceRequest.newBuilder().setInitialRequest(
1239                 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
1240             .build()));
1241     lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis));
1242     // We don't care if runSerialized() has been run.
1243     inOrder.verify(helper, atLeast(0)).runSerialized(any(Runnable.class));
1244     inOrder.verifyNoMoreInteractions();
1245 
1246     // Balancer returns a server list
1247     List<ServerEntry> serverList = Arrays.asList(
1248         new ServerEntry("127.0.0.1", 2000, "token0001"),
1249         new ServerEntry("127.0.0.1", 2010, "token0002"));
1250     lbResponseObserver.onNext(buildInitialResponse());
1251     lbResponseObserver.onNext(buildLbResponse(serverList));
1252 
1253     List<Subchannel> subchannels = fallbackTestVerifyUseOfBalancerBackendLists(inOrder, serverList);
1254 
1255     // Break connections
1256     if (balancerBroken) {
1257       lbResponseObserver.onError(Status.UNAVAILABLE.asException());
1258       // A new stream to LB is created
1259       inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
1260       lbResponseObserver = lbResponseObserverCaptor.getValue();
1261       assertEquals(1, lbRequestObservers.size());
1262       lbRequestObserver = lbRequestObservers.poll();
1263     }
1264     if (allSubchannelsBroken) {
1265       for (Subchannel subchannel : subchannels) {
1266         // A READY subchannel transits to IDLE when receiving a go-away
1267         deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(IDLE));
1268       }
1269     }
1270 
1271     if (balancerBroken && allSubchannelsBroken) {
1272       // Going into fallback
1273       subchannels = fallbackTestVerifyUseOfFallbackBackendLists(
1274           inOrder, Arrays.asList(resolutionList.get(0), resolutionList.get(2)));
1275 
1276       // When in fallback mode, fallback timer should not be scheduled when all backend
1277       // connections are lost
1278       for (Subchannel subchannel : subchannels) {
1279         deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(IDLE));
1280       }
1281 
1282       // Exit fallback mode or cancel fallback timer when receiving a new server list from balancer
1283       List<ServerEntry> serverList2 = Arrays.asList(
1284           new ServerEntry("127.0.0.1", 2001, "token0003"),
1285           new ServerEntry("127.0.0.1", 2011, "token0004"));
1286       lbResponseObserver.onNext(buildInitialResponse());
1287       lbResponseObserver.onNext(buildLbResponse(serverList2));
1288 
1289       fallbackTestVerifyUseOfBalancerBackendLists(inOrder, serverList2);
1290     }
1291     assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
1292 
1293     if (!(balancerBroken && allSubchannelsBroken)) {
1294       verify(subchannelPool, never()).takeOrCreateSubchannel(
1295           eq(resolutionList.get(0)), any(Attributes.class));
1296       verify(subchannelPool, never()).takeOrCreateSubchannel(
1297           eq(resolutionList.get(2)), any(Attributes.class));
1298     }
1299   }
1300 
fallbackTestVerifyUseOfFallbackBackendLists( InOrder inOrder, List<EquivalentAddressGroup> addrs)1301   private List<Subchannel> fallbackTestVerifyUseOfFallbackBackendLists(
1302       InOrder inOrder, List<EquivalentAddressGroup> addrs) {
1303     return fallbackTestVerifyUseOfBackendLists(inOrder, addrs, null);
1304   }
1305 
fallbackTestVerifyUseOfBalancerBackendLists( InOrder inOrder, List<ServerEntry> servers)1306   private List<Subchannel> fallbackTestVerifyUseOfBalancerBackendLists(
1307       InOrder inOrder, List<ServerEntry> servers) {
1308     ArrayList<EquivalentAddressGroup> addrs = new ArrayList<>();
1309     ArrayList<String> tokens = new ArrayList<>();
1310     for (ServerEntry server : servers) {
1311       addrs.add(new EquivalentAddressGroup(server.addr, LB_BACKEND_ATTRS));
1312       tokens.add(server.token);
1313     }
1314     return fallbackTestVerifyUseOfBackendLists(inOrder, addrs, tokens);
1315   }
1316 
fallbackTestVerifyUseOfBackendLists( InOrder inOrder, List<EquivalentAddressGroup> addrs, @Nullable List<String> tokens)1317   private List<Subchannel> fallbackTestVerifyUseOfBackendLists(
1318       InOrder inOrder, List<EquivalentAddressGroup> addrs,
1319       @Nullable List<String> tokens) {
1320     if (tokens != null) {
1321       assertEquals(addrs.size(), tokens.size());
1322     }
1323     for (EquivalentAddressGroup addr : addrs) {
1324       inOrder.verify(subchannelPool).takeOrCreateSubchannel(eq(addr), any(Attributes.class));
1325     }
1326     RoundRobinPicker picker = (RoundRobinPicker) currentPicker;
1327     assertThat(picker.dropList).containsExactlyElementsIn(Collections.nCopies(addrs.size(), null));
1328     assertThat(picker.pickList).containsExactly(GrpclbState.BUFFER_ENTRY);
1329     assertEquals(addrs.size(), mockSubchannels.size());
1330     ArrayList<Subchannel> subchannels = new ArrayList<>(mockSubchannels);
1331     mockSubchannels.clear();
1332     for (Subchannel subchannel : subchannels) {
1333       deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(CONNECTING));
1334     }
1335     inOrder.verify(helper, atLeast(0))
1336         .updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class));
1337     inOrder.verify(helper, never())
1338         .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
1339 
1340     ArrayList<BackendEntry> pickList = new ArrayList<>();
1341     for (int i = 0; i < addrs.size(); i++) {
1342       Subchannel subchannel = subchannels.get(i);
1343       BackendEntry backend;
1344       if (tokens == null) {
1345         backend = new BackendEntry(subchannel);
1346       } else {
1347         backend = new BackendEntry(subchannel, getLoadRecorder(), tokens.get(i));
1348       }
1349       pickList.add(backend);
1350       deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY));
1351       inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
1352       picker = (RoundRobinPicker) pickerCaptor.getValue();
1353       assertThat(picker.dropList)
1354           .containsExactlyElementsIn(Collections.nCopies(addrs.size(), null));
1355       assertThat(picker.pickList).containsExactlyElementsIn(pickList);
1356       inOrder.verify(helper, never())
1357           .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
1358     }
1359     return subchannels;
1360   }
1361 
1362   @Test
grpclbMultipleAuthorities()1363   public void grpclbMultipleAuthorities() throws Exception {
1364     List<EquivalentAddressGroup> grpclbResolutionList = Arrays.asList(
1365         new EquivalentAddressGroup(
1366             new FakeSocketAddress("fake-address-1"),
1367             lbAttributes("fake-authority-1")),
1368         new EquivalentAddressGroup(
1369             new FakeSocketAddress("fake-address-2"),
1370             lbAttributes("fake-authority-2")),
1371         new EquivalentAddressGroup(
1372             new FakeSocketAddress("not-a-lb-address")),
1373         new EquivalentAddressGroup(
1374             new FakeSocketAddress("fake-address-3"),
1375             lbAttributes("fake-authority-1")));
1376     final EquivalentAddressGroup goldenOobChannelEag = new EquivalentAddressGroup(
1377         Arrays.<SocketAddress>asList(
1378             new FakeSocketAddress("fake-address-1"),
1379             new FakeSocketAddress("fake-address-3")),
1380         lbAttributes("fake-authority-1")); // Supporting multiple authorities would be good, one day
1381 
1382     Attributes grpclbResolutionAttrs = Attributes.EMPTY;
1383     deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
1384 
1385     verify(helper).createOobChannel(goldenOobChannelEag, "fake-authority-1");
1386   }
1387 
1388   @Test
grpclbBalancerStreamRetry()1389   public void grpclbBalancerStreamRetry() throws Exception {
1390     LoadBalanceRequest expectedInitialRequest =
1391         LoadBalanceRequest.newBuilder()
1392             .setInitialRequest(
1393                 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
1394             .build();
1395     InOrder inOrder =
1396         inOrder(mockLbService, backoffPolicyProvider, backoffPolicy1, backoffPolicy2);
1397     List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true);
1398     Attributes grpclbResolutionAttrs = Attributes.EMPTY;
1399     deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
1400 
1401     assertEquals(1, fakeOobChannels.size());
1402     @SuppressWarnings("unused")
1403     ManagedChannel oobChannel = fakeOobChannels.poll();
1404 
1405     // First balancer RPC
1406     inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
1407     StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
1408     assertEquals(1, lbRequestObservers.size());
1409     StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
1410     verify(lbRequestObserver).onNext(eq(expectedInitialRequest));
1411     assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
1412 
1413     // Balancer closes it immediately (erroneously)
1414     lbResponseObserver.onCompleted();
1415     // Will start backoff sequence 1 (10ns)
1416     inOrder.verify(backoffPolicyProvider).get();
1417     inOrder.verify(backoffPolicy1).nextBackoffNanos();
1418     assertEquals(1, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
1419 
1420     // Fast-forward to a moment before the retry
1421     fakeClock.forwardNanos(9);
1422     verifyNoMoreInteractions(mockLbService);
1423     // Then time for retry
1424     fakeClock.forwardNanos(1);
1425     inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
1426     lbResponseObserver = lbResponseObserverCaptor.getValue();
1427     assertEquals(1, lbRequestObservers.size());
1428     lbRequestObserver = lbRequestObservers.poll();
1429     verify(lbRequestObserver).onNext(eq(expectedInitialRequest));
1430     assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
1431 
1432     // Balancer closes it with an error.
1433     lbResponseObserver.onError(Status.UNAVAILABLE.asException());
1434     // Will continue the backoff sequence 1 (100ns)
1435     verifyNoMoreInteractions(backoffPolicyProvider);
1436     inOrder.verify(backoffPolicy1).nextBackoffNanos();
1437     assertEquals(1, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
1438 
1439     // Fast-forward to a moment before the retry
1440     fakeClock.forwardNanos(100 - 1);
1441     verifyNoMoreInteractions(mockLbService);
1442     // Then time for retry
1443     fakeClock.forwardNanos(1);
1444     inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
1445     lbResponseObserver = lbResponseObserverCaptor.getValue();
1446     assertEquals(1, lbRequestObservers.size());
1447     lbRequestObserver = lbRequestObservers.poll();
1448     verify(lbRequestObserver).onNext(eq(expectedInitialRequest));
1449     assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
1450 
1451     // Balancer sends initial response.
1452     lbResponseObserver.onNext(buildInitialResponse());
1453 
1454     // Then breaks the RPC
1455     lbResponseObserver.onError(Status.UNAVAILABLE.asException());
1456 
1457     // Will reset the retry sequence and retry immediately, because balancer has responded.
1458     inOrder.verify(backoffPolicyProvider).get();
1459     inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
1460     lbResponseObserver = lbResponseObserverCaptor.getValue();
1461     assertEquals(1, lbRequestObservers.size());
1462     lbRequestObserver = lbRequestObservers.poll();
1463     verify(lbRequestObserver).onNext(eq(expectedInitialRequest));
1464 
1465     // Fail the retry after spending 4ns
1466     fakeClock.forwardNanos(4);
1467     lbResponseObserver.onError(Status.UNAVAILABLE.asException());
1468 
1469     // Will be on the first retry (10ns) of backoff sequence 2.
1470     inOrder.verify(backoffPolicy2).nextBackoffNanos();
1471     assertEquals(1, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
1472 
1473     // Fast-forward to a moment before the retry, the time spent in the last try is deducted.
1474     fakeClock.forwardNanos(10 - 4 - 1);
1475     verifyNoMoreInteractions(mockLbService);
1476     // Then time for retry
1477     fakeClock.forwardNanos(1);
1478     inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
1479     assertEquals(1, lbRequestObservers.size());
1480     lbRequestObserver = lbRequestObservers.poll();
1481     verify(lbRequestObserver).onNext(eq(expectedInitialRequest));
1482     assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
1483 
1484     // Wrapping up
1485     verify(backoffPolicyProvider, times(2)).get();
1486     verify(backoffPolicy1, times(2)).nextBackoffNanos();
1487     verify(backoffPolicy2, times(1)).nextBackoffNanos();
1488   }
1489 
deliverSubchannelState( final Subchannel subchannel, final ConnectivityStateInfo newState)1490   private void deliverSubchannelState(
1491       final Subchannel subchannel, final ConnectivityStateInfo newState) {
1492     channelExecutor.execute(new Runnable() {
1493         @Override
1494         public void run() {
1495           balancer.handleSubchannelState(subchannel, newState);
1496         }
1497       });
1498   }
1499 
deliverNameResolutionError(final Status error)1500   private void deliverNameResolutionError(final Status error) {
1501     channelExecutor.execute(new Runnable() {
1502         @Override
1503         public void run() {
1504           balancer.handleNameResolutionError(error);
1505         }
1506       });
1507   }
1508 
deliverResolvedAddresses( final List<EquivalentAddressGroup> addrs, final Attributes attrs)1509   private void deliverResolvedAddresses(
1510       final List<EquivalentAddressGroup> addrs, final Attributes attrs) {
1511     channelExecutor.execute(new Runnable() {
1512         @Override
1513         public void run() {
1514           balancer.handleResolvedAddressGroups(addrs, attrs);
1515         }
1516       });
1517   }
1518 
getLoadRecorder()1519   private GrpclbClientLoadRecorder getLoadRecorder() {
1520     return balancer.getGrpclbState().getLoadRecorder();
1521   }
1522 
createResolvedServerAddresses(boolean ... isLb)1523   private static List<EquivalentAddressGroup> createResolvedServerAddresses(boolean ... isLb) {
1524     ArrayList<EquivalentAddressGroup> list = new ArrayList<>();
1525     for (int i = 0; i < isLb.length; i++) {
1526       SocketAddress addr = new FakeSocketAddress("fake-address-" + i);
1527       EquivalentAddressGroup eag =
1528           new EquivalentAddressGroup(
1529               addr,
1530               isLb[i] ? lbAttributes(lbAuthority(i)) : Attributes.EMPTY);
1531       list.add(eag);
1532     }
1533     return list;
1534   }
1535 
lbAuthority(int unused)1536   private static String lbAuthority(int unused) {
1537     // TODO(ejona): Support varying authorities
1538     return "lb.google.com";
1539   }
1540 
lbAttributes(String authority)1541   private static Attributes lbAttributes(String authority) {
1542     return Attributes.newBuilder()
1543         .set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, authority)
1544         .build();
1545   }
1546 
buildInitialResponse()1547   private static LoadBalanceResponse buildInitialResponse() {
1548     return buildInitialResponse(0);
1549   }
1550 
buildInitialResponse(long loadReportIntervalMillis)1551   private static LoadBalanceResponse buildInitialResponse(long loadReportIntervalMillis) {
1552     return LoadBalanceResponse.newBuilder()
1553         .setInitialResponse(
1554             InitialLoadBalanceResponse.newBuilder()
1555             .setClientStatsReportInterval(Durations.fromMillis(loadReportIntervalMillis)))
1556         .build();
1557   }
1558 
buildLbResponse(List<ServerEntry> servers)1559   private static LoadBalanceResponse buildLbResponse(List<ServerEntry> servers) {
1560     ServerList.Builder serverListBuilder = ServerList.newBuilder();
1561     for (ServerEntry server : servers) {
1562       if (server.addr != null) {
1563         serverListBuilder.addServers(Server.newBuilder()
1564             .setIpAddress(ByteString.copyFrom(server.addr.getAddress().getAddress()))
1565             .setPort(server.addr.getPort())
1566             .setLoadBalanceToken(server.token)
1567             .build());
1568       } else {
1569         serverListBuilder.addServers(Server.newBuilder()
1570             .setDrop(true)
1571             .setLoadBalanceToken(server.token)
1572             .build());
1573       }
1574     }
1575     return LoadBalanceResponse.newBuilder()
1576         .setServerList(serverListBuilder.build())
1577         .build();
1578   }
1579 
1580   private static class ServerEntry {
1581     final InetSocketAddress addr;
1582     final String token;
1583 
ServerEntry(String host, int port, String token)1584     ServerEntry(String host, int port, String token) {
1585       this.addr = new InetSocketAddress(host, port);
1586       this.token = token;
1587     }
1588 
1589     // Drop entry
ServerEntry(String token)1590     ServerEntry(String token) {
1591       this.addr = null;
1592       this.token = token;
1593     }
1594   }
1595 }
1596