1 /*
2  * Copyright 2017 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.grpclb;
18 
19 import static com.google.common.base.Preconditions.checkArgument;
20 import static com.google.common.base.Preconditions.checkNotNull;
21 import static com.google.common.base.Preconditions.checkState;
22 import static io.grpc.ConnectivityState.CONNECTING;
23 import static io.grpc.ConnectivityState.IDLE;
24 import static io.grpc.ConnectivityState.READY;
25 import static io.grpc.ConnectivityState.SHUTDOWN;
26 import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
27 
28 import com.google.common.annotations.VisibleForTesting;
29 import com.google.common.base.MoreObjects;
30 import com.google.common.base.Objects;
31 import com.google.protobuf.util.Durations;
32 import io.grpc.Attributes;
33 import io.grpc.ConnectivityState;
34 import io.grpc.ConnectivityStateInfo;
35 import io.grpc.EquivalentAddressGroup;
36 import io.grpc.InternalLogId;
37 import io.grpc.LoadBalancer.Helper;
38 import io.grpc.LoadBalancer.PickResult;
39 import io.grpc.LoadBalancer.PickSubchannelArgs;
40 import io.grpc.LoadBalancer.Subchannel;
41 import io.grpc.LoadBalancer.SubchannelPicker;
42 import io.grpc.ManagedChannel;
43 import io.grpc.Metadata;
44 import io.grpc.Status;
45 import io.grpc.internal.BackoffPolicy;
46 import io.grpc.internal.GrpcAttributes;
47 import io.grpc.internal.TimeProvider;
48 import io.grpc.lb.v1.ClientStats;
49 import io.grpc.lb.v1.InitialLoadBalanceRequest;
50 import io.grpc.lb.v1.InitialLoadBalanceResponse;
51 import io.grpc.lb.v1.LoadBalanceRequest;
52 import io.grpc.lb.v1.LoadBalanceResponse;
53 import io.grpc.lb.v1.LoadBalanceResponse.LoadBalanceResponseTypeCase;
54 import io.grpc.lb.v1.LoadBalancerGrpc;
55 import io.grpc.lb.v1.Server;
56 import io.grpc.lb.v1.ServerList;
57 import io.grpc.stub.StreamObserver;
58 import java.net.InetAddress;
59 import java.net.InetSocketAddress;
60 import java.net.SocketAddress;
61 import java.net.UnknownHostException;
62 import java.util.ArrayList;
63 import java.util.Arrays;
64 import java.util.Collections;
65 import java.util.HashMap;
66 import java.util.List;
67 import java.util.Map;
68 import java.util.Map.Entry;
69 import java.util.concurrent.ScheduledExecutorService;
70 import java.util.concurrent.ScheduledFuture;
71 import java.util.concurrent.TimeUnit;
72 import java.util.concurrent.atomic.AtomicReference;
73 import java.util.logging.Level;
74 import java.util.logging.Logger;
75 import javax.annotation.Nullable;
76 import javax.annotation.concurrent.NotThreadSafe;
77 
78 /**
79  * The states of a GRPCLB working session of {@link GrpclbLoadBalancer}.  Created when
80  * GrpclbLoadBalancer switches to GRPCLB mode.  Closed and discarded when GrpclbLoadBalancer
81  * switches away from GRPCLB mode.
82  */
83 @NotThreadSafe
84 final class GrpclbState {
85   private static final Logger logger = Logger.getLogger(GrpclbState.class.getName());
86 
87   static final long FALLBACK_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(10);
88   private static final Attributes LB_PROVIDED_BACKEND_ATTRS =
89       Attributes.newBuilder().set(GrpcAttributes.ATTR_LB_PROVIDED_BACKEND, true).build();
90 
91   @VisibleForTesting
92   static final PickResult DROP_PICK_RESULT =
93       PickResult.withDrop(Status.UNAVAILABLE.withDescription("Dropped as requested by balancer"));
94 
95   @VisibleForTesting
96   static final RoundRobinEntry BUFFER_ENTRY = new RoundRobinEntry() {
97       @Override
98       public PickResult picked(Metadata headers) {
99         return PickResult.withNoResult();
100       }
101 
102       @Override
103       public String toString() {
104         return "BUFFER_ENTRY";
105       }
106     };
107 
108   private final InternalLogId logId;
109   private final String serviceName;
110   private final Helper helper;
111   private final SubchannelPool subchannelPool;
112   private final TimeProvider time;
113   private final ScheduledExecutorService timerService;
114 
115   private static final Attributes.Key<AtomicReference<ConnectivityStateInfo>> STATE_INFO =
116       Attributes.Key.create("io.grpc.grpclb.GrpclbLoadBalancer.stateInfo");
117   private final BackoffPolicy.Provider backoffPolicyProvider;
118 
119   // Scheduled only once.  Never reset.
120   @Nullable
121   private FallbackModeTask fallbackTimer;
122   private List<EquivalentAddressGroup> fallbackBackendList = Collections.emptyList();
123   private boolean usingFallbackBackends;
124   // True if the current balancer has returned a serverlist.  Will be reset to false when lost
125   // connection to a balancer.
126   private boolean balancerWorking;
127   @Nullable
128   private BackoffPolicy lbRpcRetryPolicy;
129   @Nullable
130   private LbRpcRetryTask lbRpcRetryTimer;
131   private long prevLbRpcStartNanos;
132 
133   @Nullable
134   private ManagedChannel lbCommChannel;
135 
136   @Nullable
137   private LbStream lbStream;
138   private Map<EquivalentAddressGroup, Subchannel> subchannels = Collections.emptyMap();
139 
140   // Has the same size as the round-robin list from the balancer.
141   // A drop entry from the round-robin list becomes a DropEntry here.
142   // A backend entry from the robin-robin list becomes a null here.
143   private List<DropEntry> dropList = Collections.emptyList();
144   // Contains only non-drop, i.e., backends from the round-robin list from the balancer.
145   private List<BackendEntry> backendList = Collections.emptyList();
146   private RoundRobinPicker currentPicker =
147       new RoundRobinPicker(Collections.<DropEntry>emptyList(), Arrays.asList(BUFFER_ENTRY));
148 
GrpclbState( Helper helper, SubchannelPool subchannelPool, TimeProvider time, ScheduledExecutorService timerService, BackoffPolicy.Provider backoffPolicyProvider, InternalLogId logId)149   GrpclbState(
150       Helper helper,
151       SubchannelPool subchannelPool,
152       TimeProvider time,
153       ScheduledExecutorService timerService,
154       BackoffPolicy.Provider backoffPolicyProvider,
155       InternalLogId logId) {
156     this.helper = checkNotNull(helper, "helper");
157     this.subchannelPool = checkNotNull(subchannelPool, "subchannelPool");
158     this.time = checkNotNull(time, "time provider");
159     this.timerService = checkNotNull(timerService, "timerService");
160     this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
161     this.serviceName = checkNotNull(helper.getAuthority(), "helper returns null authority");
162     this.logId = checkNotNull(logId, "logId");
163   }
164 
handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo newState)165   void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo newState) {
166     if (newState.getState() == SHUTDOWN || !(subchannels.values().contains(subchannel))) {
167       return;
168     }
169     if (newState.getState() == IDLE) {
170       subchannel.requestConnection();
171     }
172     subchannel.getAttributes().get(STATE_INFO).set(newState);
173     maybeUseFallbackBackends();
174     maybeUpdatePicker();
175   }
176 
177   /**
178    * Handle new addresses of the balancer and backends from the resolver, and create connection if
179    * not yet connected.
180    */
handleAddresses( List<LbAddressGroup> newLbAddressGroups, List<EquivalentAddressGroup> newBackendServers)181   void handleAddresses(
182       List<LbAddressGroup> newLbAddressGroups, List<EquivalentAddressGroup> newBackendServers) {
183     if (newLbAddressGroups.isEmpty()) {
184       propagateError(Status.UNAVAILABLE.withDescription(
185               "NameResolver returned no LB address while asking for GRPCLB"));
186       return;
187     }
188     LbAddressGroup newLbAddressGroup = flattenLbAddressGroups(newLbAddressGroups);
189     startLbComm(newLbAddressGroup);
190     // Avoid creating a new RPC just because the addresses were updated, as it can cause a
191     // stampeding herd. The current RPC may be on a connection to an address not present in
192     // newLbAddressGroups, but we're considering that "okay". If we detected the RPC is to an
193     // outdated backend, we could choose to re-create the RPC.
194     if (lbStream == null) {
195       startLbRpc();
196     }
197     fallbackBackendList = newBackendServers;
198     // Start the fallback timer if it's never started
199     if (fallbackTimer == null) {
200       logger.log(Level.FINE, "[{0}] Starting fallback timer.", new Object[] {logId});
201       fallbackTimer = new FallbackModeTask();
202       fallbackTimer.schedule();
203     }
204     if (usingFallbackBackends) {
205       // Populate the new fallback backends to round-robin list.
206       useFallbackBackends();
207     }
208     maybeUpdatePicker();
209   }
210 
maybeUseFallbackBackends()211   private void maybeUseFallbackBackends() {
212     if (balancerWorking) {
213       return;
214     }
215     if (usingFallbackBackends) {
216       return;
217     }
218     if (fallbackTimer != null && !fallbackTimer.discarded) {
219       return;
220     }
221     int numReadySubchannels = 0;
222     for (Subchannel subchannel : subchannels.values()) {
223       if (subchannel.getAttributes().get(STATE_INFO).get().getState() == READY) {
224         numReadySubchannels++;
225       }
226     }
227     if (numReadySubchannels > 0) {
228       return;
229     }
230     // Fallback contiditions met
231     useFallbackBackends();
232   }
233 
234   /**
235    * Populate the round-robin lists with the fallback backends.
236    */
useFallbackBackends()237   private void useFallbackBackends() {
238     usingFallbackBackends = true;
239     logger.log(Level.INFO, "[{0}] Using fallback: {1}", new Object[] {logId, fallbackBackendList});
240 
241     List<DropEntry> newDropList = new ArrayList<>();
242     List<BackendAddressGroup> newBackendAddrList = new ArrayList<>();
243     for (EquivalentAddressGroup eag : fallbackBackendList) {
244       newDropList.add(null);
245       newBackendAddrList.add(new BackendAddressGroup(eag, null));
246     }
247     useRoundRobinLists(newDropList, newBackendAddrList, null);
248   }
249 
shutdownLbComm()250   private void shutdownLbComm() {
251     if (lbCommChannel != null) {
252       lbCommChannel.shutdown();
253       lbCommChannel = null;
254     }
255     shutdownLbRpc();
256   }
257 
shutdownLbRpc()258   private void shutdownLbRpc() {
259     if (lbStream != null) {
260       lbStream.close(null);
261       // lbStream will be set to null in LbStream.cleanup()
262     }
263   }
264 
startLbComm(LbAddressGroup lbAddressGroup)265   private void startLbComm(LbAddressGroup lbAddressGroup) {
266     checkNotNull(lbAddressGroup, "lbAddressGroup");
267     if (lbCommChannel == null) {
268       lbCommChannel = helper.createOobChannel(
269           lbAddressGroup.getAddresses(), lbAddressGroup.getAuthority());
270     } else if (lbAddressGroup.getAuthority().equals(lbCommChannel.authority())) {
271       helper.updateOobChannelAddresses(lbCommChannel, lbAddressGroup.getAddresses());
272     } else {
273       // Full restart of channel
274       shutdownLbComm();
275       lbCommChannel = helper.createOobChannel(
276           lbAddressGroup.getAddresses(), lbAddressGroup.getAuthority());
277     }
278   }
279 
startLbRpc()280   private void startLbRpc() {
281     checkState(lbStream == null, "previous lbStream has not been cleared yet");
282     LoadBalancerGrpc.LoadBalancerStub stub = LoadBalancerGrpc.newStub(lbCommChannel);
283     lbStream = new LbStream(stub);
284     lbStream.start();
285     prevLbRpcStartNanos = time.currentTimeNanos();
286 
287     LoadBalanceRequest initRequest = LoadBalanceRequest.newBuilder()
288         .setInitialRequest(InitialLoadBalanceRequest.newBuilder()
289             .setName(serviceName).build())
290         .build();
291     try {
292       lbStream.lbRequestWriter.onNext(initRequest);
293     } catch (Exception e) {
294       lbStream.close(e);
295     }
296   }
297 
cancelFallbackTimer()298   private void cancelFallbackTimer() {
299     if (fallbackTimer != null) {
300       fallbackTimer.cancel();
301     }
302   }
303 
cancelLbRpcRetryTimer()304   private void cancelLbRpcRetryTimer() {
305     if (lbRpcRetryTimer != null) {
306       lbRpcRetryTimer.cancel();
307     }
308   }
309 
shutdown()310   void shutdown() {
311     shutdownLbComm();
312     // We close the subchannels through subchannelPool instead of helper just for convenience of
313     // testing.
314     for (Subchannel subchannel : subchannels.values()) {
315       subchannelPool.returnSubchannel(subchannel);
316     }
317     subchannels = Collections.emptyMap();
318     subchannelPool.clear();
319     cancelFallbackTimer();
320     cancelLbRpcRetryTimer();
321   }
322 
propagateError(Status status)323   void propagateError(Status status) {
324     logger.log(Level.FINE, "[{0}] Had an error: {1}; dropList={2}; backendList={3}",
325         new Object[] {logId, status, dropList, backendList});
326     if (backendList.isEmpty()) {
327       maybeUpdatePicker(
328           TRANSIENT_FAILURE, new RoundRobinPicker(dropList, Arrays.asList(new ErrorEntry(status))));
329     }
330   }
331 
332   @VisibleForTesting
333   @Nullable
getLoadRecorder()334   GrpclbClientLoadRecorder getLoadRecorder() {
335     if (lbStream == null) {
336       return null;
337     }
338     return lbStream.loadRecorder;
339   }
340 
341   /**
342    * Populate the round-robin lists with the given values.
343    */
useRoundRobinLists( List<DropEntry> newDropList, List<BackendAddressGroup> newBackendAddrList, @Nullable GrpclbClientLoadRecorder loadRecorder)344   private void useRoundRobinLists(
345       List<DropEntry> newDropList, List<BackendAddressGroup> newBackendAddrList,
346       @Nullable GrpclbClientLoadRecorder loadRecorder) {
347     logger.log(Level.FINE, "[{0}] Using round-robin list: {1}, droplist={2}",
348          new Object[] {logId, newBackendAddrList, newDropList});
349     HashMap<EquivalentAddressGroup, Subchannel> newSubchannelMap =
350         new HashMap<EquivalentAddressGroup, Subchannel>();
351     List<BackendEntry> newBackendList = new ArrayList<>();
352 
353     for (BackendAddressGroup backendAddr : newBackendAddrList) {
354       EquivalentAddressGroup eag = backendAddr.getAddresses();
355       Subchannel subchannel = newSubchannelMap.get(eag);
356       if (subchannel == null) {
357         subchannel = subchannels.get(eag);
358         if (subchannel == null) {
359           Attributes subchannelAttrs = Attributes.newBuilder()
360               .set(STATE_INFO,
361                   new AtomicReference<ConnectivityStateInfo>(
362                       ConnectivityStateInfo.forNonError(IDLE)))
363               .build();
364           subchannel = subchannelPool.takeOrCreateSubchannel(eag, subchannelAttrs);
365           subchannel.requestConnection();
366         }
367         newSubchannelMap.put(eag, subchannel);
368       }
369       BackendEntry entry;
370       // Only picks with tokens are reported to LoadRecorder
371       if (backendAddr.getToken() == null) {
372         entry = new BackendEntry(subchannel);
373       } else {
374         entry = new BackendEntry(subchannel, loadRecorder, backendAddr.getToken());
375       }
376       newBackendList.add(entry);
377     }
378 
379     // Close Subchannels whose addresses have been delisted
380     for (Entry<EquivalentAddressGroup, Subchannel> entry : subchannels.entrySet()) {
381       EquivalentAddressGroup eag = entry.getKey();
382       if (!newSubchannelMap.containsKey(eag)) {
383         subchannelPool.returnSubchannel(entry.getValue());
384       }
385     }
386 
387     subchannels = Collections.unmodifiableMap(newSubchannelMap);
388     dropList = Collections.unmodifiableList(newDropList);
389     backendList = Collections.unmodifiableList(newBackendList);
390   }
391 
392   @VisibleForTesting
393   class FallbackModeTask implements Runnable {
394     private ScheduledFuture<?> scheduledFuture;
395     private boolean discarded;
396 
397     @Override
run()398     public void run() {
399       helper.runSerialized(new Runnable() {
400           @Override
401           public void run() {
402             checkState(fallbackTimer == FallbackModeTask.this, "fallback timer mismatch");
403             discarded = true;
404             maybeUseFallbackBackends();
405             maybeUpdatePicker();
406           }
407         });
408     }
409 
cancel()410     void cancel() {
411       discarded = true;
412       scheduledFuture.cancel(false);
413     }
414 
schedule()415     void schedule() {
416       checkState(scheduledFuture == null, "FallbackModeTask already scheduled");
417       scheduledFuture = timerService.schedule(this, FALLBACK_TIMEOUT_MS, TimeUnit.MILLISECONDS);
418     }
419   }
420 
421   @VisibleForTesting
422   class LbRpcRetryTask implements Runnable {
423     private ScheduledFuture<?> scheduledFuture;
424 
425     @Override
run()426     public void run() {
427       helper.runSerialized(new Runnable() {
428           @Override
429           public void run() {
430             checkState(
431                 lbRpcRetryTimer == LbRpcRetryTask.this, "LbRpc retry timer mismatch");
432             startLbRpc();
433           }
434         });
435     }
436 
cancel()437     void cancel() {
438       scheduledFuture.cancel(false);
439     }
440 
schedule(long delayNanos)441     void schedule(long delayNanos) {
442       checkState(scheduledFuture == null, "LbRpcRetryTask already scheduled");
443       scheduledFuture = timerService.schedule(this, delayNanos, TimeUnit.NANOSECONDS);
444     }
445   }
446 
447   @VisibleForTesting
448   class LoadReportingTask implements Runnable {
449     private final LbStream stream;
450 
LoadReportingTask(LbStream stream)451     LoadReportingTask(LbStream stream) {
452       this.stream = stream;
453     }
454 
455     @Override
run()456     public void run() {
457       helper.runSerialized(new Runnable() {
458           @Override
459           public void run() {
460             stream.loadReportFuture = null;
461             stream.sendLoadReport();
462           }
463         });
464     }
465   }
466 
467   private class LbStream implements StreamObserver<LoadBalanceResponse> {
468     final GrpclbClientLoadRecorder loadRecorder;
469     final LoadBalancerGrpc.LoadBalancerStub stub;
470     StreamObserver<LoadBalanceRequest> lbRequestWriter;
471 
472     // These fields are only accessed from helper.runSerialized()
473     boolean initialResponseReceived;
474     boolean closed;
475     long loadReportIntervalMillis = -1;
476     ScheduledFuture<?> loadReportFuture;
477 
LbStream(LoadBalancerGrpc.LoadBalancerStub stub)478     LbStream(LoadBalancerGrpc.LoadBalancerStub stub) {
479       this.stub = checkNotNull(stub, "stub");
480       // Stats data only valid for current LbStream.  We do not carry over data from previous
481       // stream.
482       loadRecorder = new GrpclbClientLoadRecorder(time);
483     }
484 
start()485     void start() {
486       lbRequestWriter = stub.withWaitForReady().balanceLoad(this);
487     }
488 
onNext(final LoadBalanceResponse response)489     @Override public void onNext(final LoadBalanceResponse response) {
490       helper.runSerialized(new Runnable() {
491           @Override
492           public void run() {
493             handleResponse(response);
494           }
495         });
496     }
497 
onError(final Throwable error)498     @Override public void onError(final Throwable error) {
499       helper.runSerialized(new Runnable() {
500           @Override
501           public void run() {
502             handleStreamClosed(Status.fromThrowable(error)
503                 .augmentDescription("Stream to GRPCLB LoadBalancer had an error"));
504           }
505         });
506     }
507 
onCompleted()508     @Override public void onCompleted() {
509       helper.runSerialized(new Runnable() {
510           @Override
511           public void run() {
512             handleStreamClosed(
513                 Status.UNAVAILABLE.withDescription("Stream to GRPCLB LoadBalancer was closed"));
514           }
515         });
516     }
517 
518     // Following methods must be run in helper.runSerialized()
519 
sendLoadReport()520     private void sendLoadReport() {
521       if (closed) {
522         return;
523       }
524       ClientStats stats = loadRecorder.generateLoadReport();
525       // TODO(zhangkun83): flow control?
526       try {
527         lbRequestWriter.onNext(LoadBalanceRequest.newBuilder().setClientStats(stats).build());
528         scheduleNextLoadReport();
529       } catch (Exception e) {
530         close(e);
531       }
532     }
533 
scheduleNextLoadReport()534     private void scheduleNextLoadReport() {
535       if (loadReportIntervalMillis > 0) {
536         loadReportFuture = timerService.schedule(
537             new LoadReportingTask(this), loadReportIntervalMillis, TimeUnit.MILLISECONDS);
538       }
539     }
540 
handleResponse(LoadBalanceResponse response)541     private void handleResponse(LoadBalanceResponse response) {
542       if (closed) {
543         return;
544       }
545       logger.log(Level.FINER, "[{0}] Got an LB response: {1}", new Object[] {logId, response});
546 
547       LoadBalanceResponseTypeCase typeCase = response.getLoadBalanceResponseTypeCase();
548       if (!initialResponseReceived) {
549         if (typeCase != LoadBalanceResponseTypeCase.INITIAL_RESPONSE) {
550           logger.log(
551               Level.WARNING,
552               "[{0}] : Did not receive response with type initial response: {1}",
553               new Object[] {logId, response});
554           return;
555         }
556         initialResponseReceived = true;
557         InitialLoadBalanceResponse initialResponse = response.getInitialResponse();
558         loadReportIntervalMillis =
559             Durations.toMillis(initialResponse.getClientStatsReportInterval());
560         scheduleNextLoadReport();
561         return;
562       }
563 
564       if (typeCase != LoadBalanceResponseTypeCase.SERVER_LIST) {
565         logger.log(
566             Level.WARNING,
567             "[{0}] : Ignoring unexpected response type: {1}",
568             new Object[] {logId, response});
569         return;
570       }
571 
572       balancerWorking = true;
573       // TODO(zhangkun83): handle delegate from initialResponse
574       ServerList serverList = response.getServerList();
575       List<DropEntry> newDropList = new ArrayList<>();
576       List<BackendAddressGroup> newBackendAddrList = new ArrayList<>();
577       // Construct the new collections. Create new Subchannels when necessary.
578       for (Server server : serverList.getServersList()) {
579         String token = server.getLoadBalanceToken();
580         if (server.getDrop()) {
581           newDropList.add(new DropEntry(loadRecorder, token));
582         } else {
583           newDropList.add(null);
584           InetSocketAddress address;
585           try {
586             address = new InetSocketAddress(
587                 InetAddress.getByAddress(server.getIpAddress().toByteArray()), server.getPort());
588           } catch (UnknownHostException e) {
589             propagateError(
590                 Status.UNAVAILABLE
591                     .withDescription("Host for server not found: " + server)
592                     .withCause(e));
593             continue;
594           }
595           // ALTS code can use the presence of ATTR_LB_PROVIDED_BACKEND to select ALTS instead of
596           // TLS, with Netty.
597           EquivalentAddressGroup eag =
598               new EquivalentAddressGroup(address, LB_PROVIDED_BACKEND_ATTRS);
599           newBackendAddrList.add(new BackendAddressGroup(eag, token));
600         }
601       }
602       // Stop using fallback backends as soon as a new server list is received from the balancer.
603       usingFallbackBackends = false;
604       cancelFallbackTimer();
605       useRoundRobinLists(newDropList, newBackendAddrList, loadRecorder);
606       maybeUpdatePicker();
607     }
608 
handleStreamClosed(Status error)609     private void handleStreamClosed(Status error) {
610       checkArgument(!error.isOk(), "unexpected OK status");
611       if (closed) {
612         return;
613       }
614       closed = true;
615       cleanUp();
616       propagateError(error);
617       balancerWorking = false;
618       maybeUseFallbackBackends();
619       maybeUpdatePicker();
620 
621       long delayNanos = 0;
622       if (initialResponseReceived || lbRpcRetryPolicy == null) {
623         // Reset the backoff sequence if balancer has sent the initial response, or backoff sequence
624         // has never been initialized.
625         lbRpcRetryPolicy = backoffPolicyProvider.get();
626       }
627       // Backoff only when balancer wasn't working previously.
628       if (!initialResponseReceived) {
629         // The back-off policy determines the interval between consecutive RPC upstarts, thus the
630         // actual delay may be smaller than the value from the back-off policy, or even negative,
631         // depending how much time was spent in the previous RPC.
632         delayNanos =
633             prevLbRpcStartNanos + lbRpcRetryPolicy.nextBackoffNanos() - time.currentTimeNanos();
634       }
635       if (delayNanos <= 0) {
636         startLbRpc();
637       } else {
638         lbRpcRetryTimer = new LbRpcRetryTask();
639         lbRpcRetryTimer.schedule(delayNanos);
640       }
641     }
642 
close(@ullable Exception error)643     void close(@Nullable Exception error) {
644       if (closed) {
645         return;
646       }
647       closed = true;
648       cleanUp();
649       try {
650         if (error == null) {
651           lbRequestWriter.onCompleted();
652         } else {
653           lbRequestWriter.onError(error);
654         }
655       } catch (Exception e) {
656         // Don't care
657       }
658     }
659 
cleanUp()660     private void cleanUp() {
661       if (loadReportFuture != null) {
662         loadReportFuture.cancel(false);
663         loadReportFuture = null;
664       }
665       if (lbStream == this) {
666         lbStream = null;
667       }
668     }
669   }
670 
671   /**
672    * Make and use a picker out of the current lists and the states of subchannels if they have
673    * changed since the last picker created.
674    */
maybeUpdatePicker()675   private void maybeUpdatePicker() {
676     List<RoundRobinEntry> pickList = new ArrayList<>(backendList.size());
677     Status error = null;
678     boolean hasIdle = false;
679     for (BackendEntry entry : backendList) {
680       Subchannel subchannel = entry.result.getSubchannel();
681       Attributes attrs = subchannel.getAttributes();
682       ConnectivityStateInfo stateInfo = attrs.get(STATE_INFO).get();
683       if (stateInfo.getState() == READY) {
684         pickList.add(entry);
685       } else if (stateInfo.getState() == TRANSIENT_FAILURE) {
686         error = stateInfo.getStatus();
687       } else if (stateInfo.getState() == IDLE) {
688         hasIdle = true;
689       }
690     }
691     ConnectivityState state;
692     if (pickList.isEmpty()) {
693       if (error != null && !hasIdle) {
694         logger.log(Level.FINE, "[{0}] No ready Subchannel. Using error: {1}",
695             new Object[] {logId, error});
696         pickList.add(new ErrorEntry(error));
697         state = TRANSIENT_FAILURE;
698       } else {
699         logger.log(Level.FINE, "[{0}] No ready Subchannel and still connecting", logId);
700         pickList.add(BUFFER_ENTRY);
701         state = CONNECTING;
702       }
703     } else {
704       logger.log(
705           Level.FINE, "[{0}] Using drop list {1} and pick list {2}",
706           new Object[] {logId, dropList, pickList});
707       state = READY;
708     }
709     maybeUpdatePicker(state, new RoundRobinPicker(dropList, pickList));
710   }
711 
712   /**
713    * Update the given picker to the helper if it's different from the current one.
714    */
maybeUpdatePicker(ConnectivityState state, RoundRobinPicker picker)715   private void maybeUpdatePicker(ConnectivityState state, RoundRobinPicker picker) {
716     // Discard the new picker if we are sure it won't make any difference, in order to save
717     // re-processing pending streams, and avoid unnecessary resetting of the pointer in
718     // RoundRobinPicker.
719     if (picker.dropList.equals(currentPicker.dropList)
720         && picker.pickList.equals(currentPicker.pickList)) {
721       return;
722     }
723     // No need to skip ErrorPicker. If the current picker is ErrorPicker, there won't be any pending
724     // stream thus no time is wasted in re-process.
725     currentPicker = picker;
726     helper.updateBalancingState(state, picker);
727   }
728 
flattenLbAddressGroups(List<LbAddressGroup> groupList)729   private LbAddressGroup flattenLbAddressGroups(List<LbAddressGroup> groupList) {
730     assert !groupList.isEmpty();
731     List<EquivalentAddressGroup> eags = new ArrayList<>(groupList.size());
732     String authority = groupList.get(0).getAuthority();
733     for (LbAddressGroup group : groupList) {
734       if (!authority.equals(group.getAuthority())) {
735         // TODO(ejona): Allow different authorities for different addresses. Requires support from
736         // Helper.
737         logger.log(Level.WARNING,
738             "[{0}] Multiple authorities found for LB. "
739             + "Skipping addresses for {0} in preference to {1}",
740             new Object[] {logId, group.getAuthority(), authority});
741       } else {
742         eags.add(group.getAddresses());
743       }
744     }
745     // ALTS code can use the presence of ATTR_LB_ADDR_AUTHORITY to select ALTS instead of TLS, with
746     // Netty.
747     // TODO(ejona): The process here is a bit of a hack because ATTR_LB_ADDR_AUTHORITY isn't
748     // actually used in the normal case. https://github.com/grpc/grpc-java/issues/4618 should allow
749     // this to be more obvious.
750     Attributes attrs = Attributes.newBuilder()
751         .set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, authority)
752         .build();
753     return new LbAddressGroup(flattenEquivalentAddressGroup(eags, attrs), authority);
754   }
755 
756   /**
757    * Flattens list of EquivalentAddressGroup objects into one EquivalentAddressGroup object.
758    */
flattenEquivalentAddressGroup( List<EquivalentAddressGroup> groupList, Attributes attrs)759   private static EquivalentAddressGroup flattenEquivalentAddressGroup(
760       List<EquivalentAddressGroup> groupList, Attributes attrs) {
761     List<SocketAddress> addrs = new ArrayList<>();
762     for (EquivalentAddressGroup group : groupList) {
763       addrs.addAll(group.getAddresses());
764     }
765     return new EquivalentAddressGroup(addrs, attrs);
766   }
767 
768   @VisibleForTesting
769   static final class DropEntry {
770     private final GrpclbClientLoadRecorder loadRecorder;
771     private final String token;
772 
DropEntry(GrpclbClientLoadRecorder loadRecorder, String token)773     DropEntry(GrpclbClientLoadRecorder loadRecorder, String token) {
774       this.loadRecorder = checkNotNull(loadRecorder, "loadRecorder");
775       this.token = checkNotNull(token, "token");
776     }
777 
picked()778     PickResult picked() {
779       loadRecorder.recordDroppedRequest(token);
780       return DROP_PICK_RESULT;
781     }
782 
783     @Override
toString()784     public String toString() {
785       return MoreObjects.toStringHelper(this)
786           .add("loadRecorder", loadRecorder)
787           .add("token", token)
788           .toString();
789     }
790 
791     @Override
hashCode()792     public int hashCode() {
793       return Objects.hashCode(loadRecorder, token);
794     }
795 
796     @Override
equals(Object other)797     public boolean equals(Object other) {
798       if (!(other instanceof DropEntry)) {
799         return false;
800       }
801       DropEntry that = (DropEntry) other;
802       return Objects.equal(loadRecorder, that.loadRecorder) && Objects.equal(token, that.token);
803     }
804   }
805 
806   private interface RoundRobinEntry {
picked(Metadata headers)807     PickResult picked(Metadata headers);
808   }
809 
810   @VisibleForTesting
811   static final class BackendEntry implements RoundRobinEntry {
812     @VisibleForTesting
813     final PickResult result;
814     @Nullable
815     private final GrpclbClientLoadRecorder loadRecorder;
816     @Nullable
817     private final String token;
818 
819     /**
820      * Creates a BackendEntry whose usage will be reported to load recorder.
821      */
BackendEntry(Subchannel subchannel, GrpclbClientLoadRecorder loadRecorder, String token)822     BackendEntry(Subchannel subchannel, GrpclbClientLoadRecorder loadRecorder, String token) {
823       this.result = PickResult.withSubchannel(subchannel, loadRecorder);
824       this.loadRecorder = checkNotNull(loadRecorder, "loadRecorder");
825       this.token = checkNotNull(token, "token");
826     }
827 
828     /**
829      * Creates a BackendEntry whose usage will not be reported.
830      */
BackendEntry(Subchannel subchannel)831     BackendEntry(Subchannel subchannel) {
832       this.result = PickResult.withSubchannel(subchannel);
833       this.loadRecorder = null;
834       this.token = null;
835     }
836 
837     @Override
picked(Metadata headers)838     public PickResult picked(Metadata headers) {
839       headers.discardAll(GrpclbConstants.TOKEN_METADATA_KEY);
840       if (token != null) {
841         headers.put(GrpclbConstants.TOKEN_METADATA_KEY, token);
842       }
843       return result;
844     }
845 
846     @Override
toString()847     public String toString() {
848       return MoreObjects.toStringHelper(this)
849           .add("result", result)
850           .add("loadRecorder", loadRecorder)
851           .add("token", token)
852           .toString();
853     }
854 
855     @Override
hashCode()856     public int hashCode() {
857       return Objects.hashCode(loadRecorder, result, token);
858     }
859 
860     @Override
equals(Object other)861     public boolean equals(Object other) {
862       if (!(other instanceof BackendEntry)) {
863         return false;
864       }
865       BackendEntry that = (BackendEntry) other;
866       return Objects.equal(result, that.result) && Objects.equal(token, that.token)
867           && Objects.equal(loadRecorder, that.loadRecorder);
868     }
869   }
870 
871   @VisibleForTesting
872   static final class ErrorEntry implements RoundRobinEntry {
873     final PickResult result;
874 
ErrorEntry(Status status)875     ErrorEntry(Status status) {
876       result = PickResult.withError(status);
877     }
878 
879     @Override
picked(Metadata headers)880     public PickResult picked(Metadata headers) {
881       return result;
882     }
883 
884     @Override
hashCode()885     public int hashCode() {
886       return Objects.hashCode(result);
887     }
888 
889     @Override
equals(Object other)890     public boolean equals(Object other) {
891       if (!(other instanceof ErrorEntry)) {
892         return false;
893       }
894       return Objects.equal(result, ((ErrorEntry) other).result);
895     }
896 
897     @Override
toString()898     public String toString() {
899       return MoreObjects.toStringHelper(this)
900           .add("result", result)
901           .toString();
902     }
903   }
904 
905   @VisibleForTesting
906   static final class RoundRobinPicker extends SubchannelPicker {
907     @VisibleForTesting
908     final List<DropEntry> dropList;
909     private int dropIndex;
910 
911     @VisibleForTesting
912     final List<? extends RoundRobinEntry> pickList;
913     private int pickIndex;
914 
915     // dropList can be empty, which means no drop.
916     // pickList must not be empty.
RoundRobinPicker(List<DropEntry> dropList, List<? extends RoundRobinEntry> pickList)917     RoundRobinPicker(List<DropEntry> dropList, List<? extends RoundRobinEntry> pickList) {
918       this.dropList = checkNotNull(dropList, "dropList");
919       this.pickList = checkNotNull(pickList, "pickList");
920       checkArgument(!pickList.isEmpty(), "pickList is empty");
921     }
922 
923     @Override
pickSubchannel(PickSubchannelArgs args)924     public PickResult pickSubchannel(PickSubchannelArgs args) {
925       synchronized (pickList) {
926         // Two-level round-robin.
927         // First round-robin on dropList. If a drop entry is selected, request will be dropped.  If
928         // a non-drop entry is selected, then round-robin on pickList.  This makes sure requests are
929         // dropped at the same proportion as the drop entries appear on the round-robin list from
930         // the balancer, while only READY backends (that make up pickList) are selected for the
931         // non-drop cases.
932         if (!dropList.isEmpty()) {
933           DropEntry drop = dropList.get(dropIndex);
934           dropIndex++;
935           if (dropIndex == dropList.size()) {
936             dropIndex = 0;
937           }
938           if (drop != null) {
939             return drop.picked();
940           }
941         }
942 
943         RoundRobinEntry pick = pickList.get(pickIndex);
944         pickIndex++;
945         if (pickIndex == pickList.size()) {
946           pickIndex = 0;
947         }
948         return pick.picked(args.getHeaders());
949       }
950     }
951   }
952 }
953