1 /*
2  * Copyright 2017 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.internal;
18 
19 import static com.google.common.base.Preconditions.checkNotNull;
20 import static com.google.common.base.Preconditions.checkState;
21 
22 import com.google.common.annotations.VisibleForTesting;
23 import com.google.common.base.Objects;
24 import io.grpc.Attributes;
25 import io.grpc.CallOptions;
26 import io.grpc.ClientStreamTracer;
27 import io.grpc.Compressor;
28 import io.grpc.Deadline;
29 import io.grpc.DecompressorRegistry;
30 import io.grpc.Metadata;
31 import io.grpc.MethodDescriptor;
32 import io.grpc.Status;
33 import java.io.InputStream;
34 import java.util.ArrayList;
35 import java.util.Collection;
36 import java.util.Collections;
37 import java.util.List;
38 import java.util.Random;
39 import java.util.concurrent.Executor;
40 import java.util.concurrent.Future;
41 import java.util.concurrent.ScheduledExecutorService;
42 import java.util.concurrent.TimeUnit;
43 import java.util.concurrent.atomic.AtomicInteger;
44 import java.util.concurrent.atomic.AtomicLong;
45 import javax.annotation.CheckReturnValue;
46 import javax.annotation.Nullable;
47 import javax.annotation.concurrent.GuardedBy;
48 
49 /** A logical {@link ClientStream} that is retriable. */
50 abstract class RetriableStream<ReqT> implements ClientStream {
51   @VisibleForTesting
52   static final Metadata.Key<String> GRPC_PREVIOUS_RPC_ATTEMPTS =
53       Metadata.Key.of("grpc-previous-rpc-attempts", Metadata.ASCII_STRING_MARSHALLER);
54 
55   @VisibleForTesting
56   static final Metadata.Key<String> GRPC_RETRY_PUSHBACK_MS =
57       Metadata.Key.of("grpc-retry-pushback-ms", Metadata.ASCII_STRING_MARSHALLER);
58 
59   private static final Status CANCELLED_BECAUSE_COMMITTED =
60       Status.CANCELLED.withDescription("Stream thrown away because RetriableStream committed");
61 
62   private final MethodDescriptor<ReqT, ?> method;
63   private final Executor callExecutor;
64   private final ScheduledExecutorService scheduledExecutorService;
65   // Must not modify it.
66   private final Metadata headers;
67   private final RetryPolicy.Provider retryPolicyProvider;
68   private final HedgingPolicy.Provider hedgingPolicyProvider;
69   private RetryPolicy retryPolicy;
70 
71   /** Must be held when updating state, accessing state.buffer, or certain substream attributes. */
72   private final Object lock = new Object();
73 
74   private final ChannelBufferMeter channelBufferUsed;
75   private final long perRpcBufferLimit;
76   private final long channelBufferLimit;
77   @Nullable
78   private final Throttle throttle;
79 
80   private volatile State state = new State(
81       new ArrayList<BufferEntry>(8), Collections.<Substream>emptyList(), null, false, false);
82 
83   /**
84    * Either transparent retry happened or reached server's application logic.
85    */
86   private boolean noMoreTransparentRetry;
87 
88   // Used for recording the share of buffer used for the current call out of the channel buffer.
89   // This field would not be necessary if there is no channel buffer limit.
90   @GuardedBy("lock")
91   private long perRpcBufferUsed;
92 
93   private ClientStreamListener masterListener;
94   private Future<?> scheduledRetry;
95   private long nextBackoffIntervalNanos;
96 
RetriableStream( MethodDescriptor<ReqT, ?> method, Metadata headers, ChannelBufferMeter channelBufferUsed, long perRpcBufferLimit, long channelBufferLimit, Executor callExecutor, ScheduledExecutorService scheduledExecutorService, RetryPolicy.Provider retryPolicyProvider, HedgingPolicy.Provider hedgingPolicyProvider, @Nullable Throttle throttle)97   RetriableStream(
98       MethodDescriptor<ReqT, ?> method, Metadata headers,
99       ChannelBufferMeter channelBufferUsed, long perRpcBufferLimit, long channelBufferLimit,
100       Executor callExecutor, ScheduledExecutorService scheduledExecutorService,
101       RetryPolicy.Provider retryPolicyProvider, HedgingPolicy.Provider hedgingPolicyProvider,
102       @Nullable Throttle throttle) {
103     this.method = method;
104     this.channelBufferUsed = channelBufferUsed;
105     this.perRpcBufferLimit = perRpcBufferLimit;
106     this.channelBufferLimit = channelBufferLimit;
107     this.callExecutor = callExecutor;
108     this.scheduledExecutorService = scheduledExecutorService;
109     this.headers = headers;
110     this.retryPolicyProvider = checkNotNull(retryPolicyProvider, "retryPolicyProvider");
111     this.hedgingPolicyProvider = checkNotNull(hedgingPolicyProvider, "hedgingPolicyProvider");
112     this.throttle = throttle;
113   }
114 
115   @Nullable // null if already committed
116   @CheckReturnValue
commit(final Substream winningSubstream)117   private Runnable commit(final Substream winningSubstream) {
118     synchronized (lock) {
119       if (state.winningSubstream != null) {
120         return null;
121       }
122       final Collection<Substream> savedDrainedSubstreams = state.drainedSubstreams;
123 
124       state = state.committed(winningSubstream);
125 
126       // subtract the share of this RPC from channelBufferUsed.
127       channelBufferUsed.addAndGet(-perRpcBufferUsed);
128 
129       class CommitTask implements Runnable {
130         @Override
131         public void run() {
132           // For hedging only, not needed for normal retry
133           // TODO(zdapeng): also cancel all the scheduled hedges.
134           for (Substream substream : savedDrainedSubstreams) {
135             if (substream != winningSubstream) {
136               substream.stream.cancel(CANCELLED_BECAUSE_COMMITTED);
137             }
138           }
139 
140           postCommit();
141         }
142       }
143 
144       return new CommitTask();
145     }
146   }
147 
postCommit()148   abstract void postCommit();
149 
150   /**
151    * Calls commit() and if successful runs the post commit task.
152    */
commitAndRun(Substream winningSubstream)153   private void commitAndRun(Substream winningSubstream) {
154     Runnable postCommitTask = commit(winningSubstream);
155 
156     if (postCommitTask != null) {
157       postCommitTask.run();
158     }
159   }
160 
161 
createSubstream(int previousAttempts)162   private Substream createSubstream(int previousAttempts) {
163     Substream sub = new Substream(previousAttempts);
164     // one tracer per substream
165     final ClientStreamTracer bufferSizeTracer = new BufferSizeTracer(sub);
166     ClientStreamTracer.Factory tracerFactory = new ClientStreamTracer.Factory() {
167       @Override
168       public ClientStreamTracer newClientStreamTracer(CallOptions callOptions, Metadata headers) {
169         return bufferSizeTracer;
170       }
171     };
172 
173     Metadata newHeaders = updateHeaders(headers, previousAttempts);
174     // NOTICE: This set _must_ be done before stream.start() and it actually is.
175     sub.stream = newSubstream(tracerFactory, newHeaders);
176     return sub;
177   }
178 
179   /**
180    * Creates a new physical ClientStream that represents a retry/hedging attempt. The returned
181    * Client stream is not yet started.
182    */
newSubstream( ClientStreamTracer.Factory tracerFactory, Metadata headers)183   abstract ClientStream newSubstream(
184       ClientStreamTracer.Factory tracerFactory, Metadata headers);
185 
186   /** Adds grpc-previous-rpc-attempts in the headers of a retry/hedging RPC. */
187   @VisibleForTesting
updateHeaders( Metadata originalHeaders, int previousAttempts)188   final Metadata updateHeaders(
189       Metadata originalHeaders, int previousAttempts) {
190     Metadata newHeaders = new Metadata();
191     newHeaders.merge(originalHeaders);
192     if (previousAttempts > 0) {
193       newHeaders.put(GRPC_PREVIOUS_RPC_ATTEMPTS, String.valueOf(previousAttempts));
194     }
195     return newHeaders;
196   }
197 
drain(Substream substream)198   private void drain(Substream substream) {
199     int index = 0;
200     int chunk = 0x80;
201     List<BufferEntry> list = null;
202 
203     while (true) {
204       State savedState;
205 
206       synchronized (lock) {
207         savedState = state;
208         if (savedState.winningSubstream != null && savedState.winningSubstream != substream) {
209           // committed but not me
210           break;
211         }
212         if (index == savedState.buffer.size()) { // I'm drained
213           state = savedState.substreamDrained(substream);
214           return;
215         }
216 
217         if (substream.closed) {
218           return;
219         }
220 
221         int stop = Math.min(index + chunk, savedState.buffer.size());
222         if (list == null) {
223           list = new ArrayList<>(savedState.buffer.subList(index, stop));
224         } else {
225           list.clear();
226           list.addAll(savedState.buffer.subList(index, stop));
227         }
228         index = stop;
229       }
230 
231       for (BufferEntry bufferEntry : list) {
232         savedState = state;
233         if (savedState.winningSubstream != null && savedState.winningSubstream != substream) {
234           // committed but not me
235           break;
236         }
237         if (savedState.cancelled) {
238           checkState(
239               savedState.winningSubstream == substream,
240               "substream should be CANCELLED_BECAUSE_COMMITTED already");
241           return;
242         }
243         bufferEntry.runWith(substream);
244       }
245     }
246 
247     substream.stream.cancel(CANCELLED_BECAUSE_COMMITTED);
248   }
249 
250   /**
251    * Runs pre-start tasks. Returns the Status of shutdown if the channel is shutdown.
252    */
253   @CheckReturnValue
254   @Nullable
prestart()255   abstract Status prestart();
256 
257   /** Starts the first PRC attempt. */
258   @Override
start(ClientStreamListener listener)259   public final void start(ClientStreamListener listener) {
260     masterListener = listener;
261 
262     Status shutdownStatus = prestart();
263 
264     if (shutdownStatus != null) {
265       cancel(shutdownStatus);
266       return;
267     }
268 
269     class StartEntry implements BufferEntry {
270       @Override
271       public void runWith(Substream substream) {
272         substream.stream.start(new Sublistener(substream));
273       }
274     }
275 
276     synchronized (lock) {
277       state.buffer.add(new StartEntry());
278     }
279 
280     Substream substream = createSubstream(0);
281     drain(substream);
282 
283     // TODO(zdapeng): schedule hedging if needed
284   }
285 
286   @Override
cancel(Status reason)287   public final void cancel(Status reason) {
288     Substream noopSubstream = new Substream(0 /* previousAttempts doesn't matter here */);
289     noopSubstream.stream = new NoopClientStream();
290     Runnable runnable = commit(noopSubstream);
291 
292     if (runnable != null) {
293       Future<?> savedScheduledRetry = scheduledRetry;
294       if (savedScheduledRetry != null) {
295         savedScheduledRetry.cancel(false);
296         scheduledRetry = null;
297       }
298       masterListener.closed(reason, new Metadata());
299       runnable.run();
300       return;
301     }
302 
303     state.winningSubstream.stream.cancel(reason);
304     synchronized (lock) {
305       // This is not required, but causes a short-circuit in the draining process.
306       state = state.cancelled();
307     }
308   }
309 
delayOrExecute(BufferEntry bufferEntry)310   private void delayOrExecute(BufferEntry bufferEntry) {
311     Collection<Substream> savedDrainedSubstreams;
312     synchronized (lock) {
313       if (!state.passThrough) {
314         state.buffer.add(bufferEntry);
315       }
316       savedDrainedSubstreams = state.drainedSubstreams;
317     }
318 
319     for (Substream substream : savedDrainedSubstreams) {
320       bufferEntry.runWith(substream);
321     }
322   }
323 
324   /**
325    * Do not use it directly. Use {@link #sendMessage(ReqT)} instead because we don't use InputStream
326    * for buffering.
327    */
328   @Override
writeMessage(InputStream message)329   public final void writeMessage(InputStream message) {
330     throw new IllegalStateException("RetriableStream.writeMessage() should not be called directly");
331   }
332 
sendMessage(final ReqT message)333   final void sendMessage(final ReqT message) {
334     State savedState = state;
335     if (savedState.passThrough) {
336       savedState.winningSubstream.stream.writeMessage(method.streamRequest(message));
337       return;
338     }
339 
340     class SendMessageEntry implements BufferEntry {
341       @Override
342       public void runWith(Substream substream) {
343         substream.stream.writeMessage(method.streamRequest(message));
344       }
345     }
346 
347     delayOrExecute(new SendMessageEntry());
348   }
349 
350   @Override
request(final int numMessages)351   public final void request(final int numMessages) {
352     State savedState = state;
353     if (savedState.passThrough) {
354       savedState.winningSubstream.stream.request(numMessages);
355       return;
356     }
357 
358     class RequestEntry implements BufferEntry {
359       @Override
360       public void runWith(Substream substream) {
361         substream.stream.request(numMessages);
362       }
363     }
364 
365     delayOrExecute(new RequestEntry());
366   }
367 
368   @Override
flush()369   public final void flush() {
370     State savedState = state;
371     if (savedState.passThrough) {
372       savedState.winningSubstream.stream.flush();
373       return;
374     }
375 
376     class FlushEntry implements BufferEntry {
377       @Override
378       public void runWith(Substream substream) {
379         substream.stream.flush();
380       }
381     }
382 
383     delayOrExecute(new FlushEntry());
384   }
385 
386   @Override
isReady()387   public final boolean isReady() {
388     for (Substream substream : state.drainedSubstreams) {
389       if (substream.stream.isReady()) {
390         return true;
391       }
392     }
393     return false;
394   }
395 
396   @Override
setCompressor(final Compressor compressor)397   public final void setCompressor(final Compressor compressor) {
398     class CompressorEntry implements BufferEntry {
399       @Override
400       public void runWith(Substream substream) {
401         substream.stream.setCompressor(compressor);
402       }
403     }
404 
405     delayOrExecute(new CompressorEntry());
406   }
407 
408   @Override
setFullStreamDecompression(final boolean fullStreamDecompression)409   public final void setFullStreamDecompression(final boolean fullStreamDecompression) {
410     class FullStreamDecompressionEntry implements BufferEntry {
411       @Override
412       public void runWith(Substream substream) {
413         substream.stream.setFullStreamDecompression(fullStreamDecompression);
414       }
415     }
416 
417     delayOrExecute(new FullStreamDecompressionEntry());
418   }
419 
420   @Override
setMessageCompression(final boolean enable)421   public final void setMessageCompression(final boolean enable) {
422     class MessageCompressionEntry implements BufferEntry {
423       @Override
424       public void runWith(Substream substream) {
425         substream.stream.setMessageCompression(enable);
426       }
427     }
428 
429     delayOrExecute(new MessageCompressionEntry());
430   }
431 
432   @Override
halfClose()433   public final void halfClose() {
434     class HalfCloseEntry implements BufferEntry {
435       @Override
436       public void runWith(Substream substream) {
437         substream.stream.halfClose();
438       }
439     }
440 
441     delayOrExecute(new HalfCloseEntry());
442   }
443 
444   @Override
setAuthority(final String authority)445   public final void setAuthority(final String authority) {
446     class AuthorityEntry implements BufferEntry {
447       @Override
448       public void runWith(Substream substream) {
449         substream.stream.setAuthority(authority);
450       }
451     }
452 
453     delayOrExecute(new AuthorityEntry());
454   }
455 
456   @Override
setDecompressorRegistry(final DecompressorRegistry decompressorRegistry)457   public final void setDecompressorRegistry(final DecompressorRegistry decompressorRegistry) {
458     class DecompressorRegistryEntry implements BufferEntry {
459       @Override
460       public void runWith(Substream substream) {
461         substream.stream.setDecompressorRegistry(decompressorRegistry);
462       }
463     }
464 
465     delayOrExecute(new DecompressorRegistryEntry());
466   }
467 
468   @Override
setMaxInboundMessageSize(final int maxSize)469   public final void setMaxInboundMessageSize(final int maxSize) {
470     class MaxInboundMessageSizeEntry implements BufferEntry {
471       @Override
472       public void runWith(Substream substream) {
473         substream.stream.setMaxInboundMessageSize(maxSize);
474       }
475     }
476 
477     delayOrExecute(new MaxInboundMessageSizeEntry());
478   }
479 
480   @Override
setMaxOutboundMessageSize(final int maxSize)481   public final void setMaxOutboundMessageSize(final int maxSize) {
482     class MaxOutboundMessageSizeEntry implements BufferEntry {
483       @Override
484       public void runWith(Substream substream) {
485         substream.stream.setMaxOutboundMessageSize(maxSize);
486       }
487     }
488 
489     delayOrExecute(new MaxOutboundMessageSizeEntry());
490   }
491 
492   @Override
setDeadline(final Deadline deadline)493   public final void setDeadline(final Deadline deadline) {
494     class DeadlineEntry implements BufferEntry {
495       @Override
496       public void runWith(Substream substream) {
497         substream.stream.setDeadline(deadline);
498       }
499     }
500 
501     delayOrExecute(new DeadlineEntry());
502   }
503 
504   @Override
getAttributes()505   public final Attributes getAttributes() {
506     if (state.winningSubstream != null) {
507       return state.winningSubstream.stream.getAttributes();
508     }
509     return Attributes.EMPTY;
510   }
511 
512   private static Random random = new Random();
513 
514   @VisibleForTesting
setRandom(Random random)515   static void setRandom(Random random) {
516     RetriableStream.random = random;
517   }
518 
hasHedging()519   boolean hasHedging() {
520     return false;
521   }
522 
523   private interface BufferEntry {
524     /** Replays the buffer entry with the given stream. */
runWith(Substream substream)525     void runWith(Substream substream);
526   }
527 
528   private final class Sublistener implements ClientStreamListener {
529     final Substream substream;
530 
Sublistener(Substream substream)531     Sublistener(Substream substream) {
532       this.substream = substream;
533     }
534 
535     @Override
headersRead(Metadata headers)536     public void headersRead(Metadata headers) {
537       commitAndRun(substream);
538       if (state.winningSubstream == substream) {
539         masterListener.headersRead(headers);
540         if (throttle != null) {
541           throttle.onSuccess();
542         }
543       }
544     }
545 
546     @Override
closed(Status status, Metadata trailers)547     public void closed(Status status, Metadata trailers) {
548       closed(status, RpcProgress.PROCESSED, trailers);
549     }
550 
551     @Override
closed(Status status, RpcProgress rpcProgress, Metadata trailers)552     public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) {
553       synchronized (lock) {
554         state = state.substreamClosed(substream);
555       }
556 
557       // handle a race between buffer limit exceeded and closed, when setting
558       // substream.bufferLimitExceeded = true happens before state.substreamClosed(substream).
559       if (substream.bufferLimitExceeded) {
560         commitAndRun(substream);
561         if (state.winningSubstream == substream) {
562           masterListener.closed(status, trailers);
563         }
564         return;
565       }
566 
567       if (state.winningSubstream == null) {
568         if (rpcProgress == RpcProgress.REFUSED && !noMoreTransparentRetry) {
569           // TODO(zdapeng): in hedging case noMoreTransparentRetry might need be synchronized.
570           noMoreTransparentRetry = true;
571           callExecutor.execute(new Runnable() {
572             @Override
573             public void run() {
574               // transparent retry
575               Substream newSubstream = createSubstream(
576                   substream.previousAttempts);
577               drain(newSubstream);
578             }
579           });
580           return;
581         } else if (rpcProgress == RpcProgress.DROPPED) {
582           // For normal retry, nothing need be done here, will just commit.
583           // For hedging:
584           // TODO(zdapeng): cancel all scheduled hedges (TBD)
585         } else {
586           noMoreTransparentRetry = true;
587 
588           if (retryPolicy == null) {
589             retryPolicy = retryPolicyProvider.get();
590             nextBackoffIntervalNanos = retryPolicy.initialBackoffNanos;
591           }
592 
593           RetryPlan retryPlan = makeRetryDecision(retryPolicy, status, trailers);
594           if (retryPlan.shouldRetry) {
595             // The check state.winningSubstream == null, checking if is not already committed, is
596             // racy, but is still safe b/c the retry will also handle committed/cancellation
597             scheduledRetry = scheduledExecutorService.schedule(
598                 new Runnable() {
599                   @Override
600                   public void run() {
601                     scheduledRetry = null;
602                     callExecutor.execute(new Runnable() {
603                       @Override
604                       public void run() {
605                         // retry
606                         Substream newSubstream = createSubstream(substream.previousAttempts + 1);
607                         drain(newSubstream);
608                       }
609                     });
610                   }
611                 },
612                 retryPlan.backoffNanos,
613                 TimeUnit.NANOSECONDS);
614             return;
615           }
616         }
617       }
618 
619       if (!hasHedging()) {
620         commitAndRun(substream);
621         if (state.winningSubstream == substream) {
622           masterListener.closed(status, trailers);
623         }
624       }
625 
626       // TODO(zdapeng): in hedge case, if this is a fatal status, cancel all the other attempts, and
627       // close the masterListener.
628     }
629 
630     /**
631      * Decides in current situation whether or not the RPC should retry and if it should retry how
632      * long the backoff should be. The decision does not take the commitment status into account, so
633      * caller should check it separately.
634      */
635     // TODO(zdapeng): add HedgingPolicy as param
makeRetryDecision(RetryPolicy retryPolicy, Status status, Metadata trailer)636     private RetryPlan makeRetryDecision(RetryPolicy retryPolicy, Status status, Metadata trailer) {
637       boolean shouldRetry = false;
638       long backoffNanos = 0L;
639       boolean isRetryableStatusCode = retryPolicy.retryableStatusCodes.contains(status.getCode());
640 
641       String pushbackStr = trailer.get(GRPC_RETRY_PUSHBACK_MS);
642       Integer pushbackMillis = null;
643       if (pushbackStr != null) {
644         try {
645           pushbackMillis = Integer.valueOf(pushbackStr);
646         } catch (NumberFormatException e) {
647           pushbackMillis = -1;
648         }
649       }
650 
651       boolean isThrottled = false;
652       if (throttle != null) {
653         if (isRetryableStatusCode || (pushbackMillis != null && pushbackMillis < 0)) {
654           isThrottled = !throttle.onQualifiedFailureThenCheckIsAboveThreshold();
655         }
656       }
657 
658       if (retryPolicy.maxAttempts > substream.previousAttempts + 1 && !isThrottled) {
659         if (pushbackMillis == null) {
660           if (isRetryableStatusCode) {
661             shouldRetry = true;
662             backoffNanos = (long) (nextBackoffIntervalNanos * random.nextDouble());
663             nextBackoffIntervalNanos = Math.min(
664                 (long) (nextBackoffIntervalNanos * retryPolicy.backoffMultiplier),
665                 retryPolicy.maxBackoffNanos);
666 
667           } // else no retry
668         } else if (pushbackMillis >= 0) {
669           shouldRetry = true;
670           backoffNanos = TimeUnit.MILLISECONDS.toNanos(pushbackMillis);
671           nextBackoffIntervalNanos = retryPolicy.initialBackoffNanos;
672         } // else no retry
673       } // else no retry
674 
675       // TODO(zdapeng): transparent retry
676       // TODO(zdapeng): hedging
677       return new RetryPlan(shouldRetry, backoffNanos);
678     }
679 
680     @Override
messagesAvailable(MessageProducer producer)681     public void messagesAvailable(MessageProducer producer) {
682       State savedState = state;
683       checkState(
684           savedState.winningSubstream != null, "Headers should be received prior to messages.");
685       if (savedState.winningSubstream != substream) {
686         return;
687       }
688       masterListener.messagesAvailable(producer);
689     }
690 
691     @Override
onReady()692     public void onReady() {
693       // TODO(zdapeng): the more correct way to handle onReady
694       if (state.drainedSubstreams.contains(substream)) {
695         masterListener.onReady();
696       }
697     }
698   }
699 
700   private static final class State {
701     /** Committed and the winning substream drained. */
702     final boolean passThrough;
703 
704     /** A list of buffered ClientStream runnables. Set to Null once passThrough. */
705     @Nullable final List<BufferEntry> buffer;
706 
707     /**
708      * Unmodifiable collection of all the substreams that are drained. Exceptional cases: Singleton
709      * once passThrough; Empty if committed but not passTrough.
710      */
711     final Collection<Substream> drainedSubstreams;
712 
713     /** Null until committed. */
714     @Nullable final Substream winningSubstream;
715 
716     /** Not required to set to true when cancelled, but can short-circuit the draining process. */
717     final boolean cancelled;
718 
State( @ullable List<BufferEntry> buffer, Collection<Substream> drainedSubstreams, @Nullable Substream winningSubstream, boolean cancelled, boolean passThrough)719     State(
720         @Nullable List<BufferEntry> buffer,
721         Collection<Substream> drainedSubstreams,
722         @Nullable Substream winningSubstream,
723         boolean cancelled,
724         boolean passThrough) {
725       this.buffer = buffer;
726       this.drainedSubstreams =
727           checkNotNull(drainedSubstreams, "drainedSubstreams");
728       this.winningSubstream = winningSubstream;
729       this.cancelled = cancelled;
730       this.passThrough = passThrough;
731 
732       checkState(!passThrough || buffer == null, "passThrough should imply buffer is null");
733       checkState(
734           !passThrough || winningSubstream != null,
735           "passThrough should imply winningSubstream != null");
736       checkState(
737           !passThrough
738               || (drainedSubstreams.size() == 1 && drainedSubstreams.contains(winningSubstream))
739               || (drainedSubstreams.size() == 0 && winningSubstream.closed),
740           "passThrough should imply winningSubstream is drained");
741       checkState(!cancelled || winningSubstream != null, "cancelled should imply committed");
742     }
743 
744     @CheckReturnValue
745     // GuardedBy RetriableStream.lock
cancelled()746     State cancelled() {
747       return new State(buffer, drainedSubstreams, winningSubstream, true, passThrough);
748     }
749 
750     /** The given substream is drained. */
751     @CheckReturnValue
752     // GuardedBy RetriableStream.lock
substreamDrained(Substream substream)753     State substreamDrained(Substream substream) {
754       checkState(!passThrough, "Already passThrough");
755 
756       Collection<Substream> drainedSubstreams;
757 
758       if (substream.closed) {
759         drainedSubstreams = this.drainedSubstreams;
760       } else if (this.drainedSubstreams.isEmpty()) {
761         // optimize for 0-retry, which is most of the cases.
762         drainedSubstreams = Collections.singletonList(substream);
763       } else {
764         drainedSubstreams = new ArrayList<>(this.drainedSubstreams);
765         drainedSubstreams.add(substream);
766         drainedSubstreams = Collections.unmodifiableCollection(drainedSubstreams);
767       }
768 
769       boolean passThrough = winningSubstream != null;
770 
771       List<BufferEntry> buffer = this.buffer;
772       if (passThrough) {
773         checkState(
774             winningSubstream == substream, "Another RPC attempt has already committed");
775         buffer = null;
776       }
777 
778       return new State(buffer, drainedSubstreams, winningSubstream, cancelled, passThrough);
779     }
780 
781     /** The given substream is closed. */
782     @CheckReturnValue
783     // GuardedBy RetriableStream.lock
substreamClosed(Substream substream)784     State substreamClosed(Substream substream) {
785       substream.closed = true;
786       if (this.drainedSubstreams.contains(substream)) {
787         Collection<Substream> drainedSubstreams = new ArrayList<>(this.drainedSubstreams);
788         drainedSubstreams.remove(substream);
789         drainedSubstreams = Collections.unmodifiableCollection(drainedSubstreams);
790         return new State(buffer, drainedSubstreams, winningSubstream, cancelled, passThrough);
791       } else {
792         return this;
793       }
794     }
795 
796     @CheckReturnValue
797     // GuardedBy RetriableStream.lock
committed(Substream winningSubstream)798     State committed(Substream winningSubstream) {
799       checkState(this.winningSubstream == null, "Already committed");
800 
801       boolean passThrough = false;
802       List<BufferEntry> buffer = this.buffer;
803       Collection<Substream> drainedSubstreams;
804 
805       if (this.drainedSubstreams.contains(winningSubstream)) {
806         passThrough = true;
807         buffer = null;
808         drainedSubstreams = Collections.singleton(winningSubstream);
809       } else {
810         drainedSubstreams = Collections.emptyList();
811       }
812 
813       return new State(buffer, drainedSubstreams, winningSubstream, cancelled, passThrough);
814     }
815   }
816 
817   /**
818    * A wrapper of a physical stream of a retry/hedging attempt, that comes with some useful
819    *  attributes.
820    */
821   private static final class Substream {
822     ClientStream stream;
823 
824     // GuardedBy RetriableStream.lock
825     boolean closed;
826 
827     // setting to true must be GuardedBy RetriableStream.lock
828     boolean bufferLimitExceeded;
829 
830     final int previousAttempts;
831 
Substream(int previousAttempts)832     Substream(int previousAttempts) {
833       this.previousAttempts = previousAttempts;
834     }
835   }
836 
837 
838   /**
839    * Traces the buffer used by a substream.
840    */
841   class BufferSizeTracer extends ClientStreamTracer {
842     // Each buffer size tracer is dedicated to one specific substream.
843     private final Substream substream;
844 
845     @GuardedBy("lock")
846     long bufferNeeded;
847 
BufferSizeTracer(Substream substream)848     BufferSizeTracer(Substream substream) {
849       this.substream = substream;
850     }
851 
852     /**
853      * A message is sent to the wire, so its reference would be released if no retry or
854      * hedging were involved. So at this point we have to hold the reference of the message longer
855      * for retry, and we need to increment {@code substream.bufferNeeded}.
856      */
857     @Override
outboundWireSize(long bytes)858     public void outboundWireSize(long bytes) {
859       if (state.winningSubstream != null) {
860         return;
861       }
862 
863       Runnable postCommitTask = null;
864 
865       // TODO(zdapeng): avoid using the same lock for both in-bound and out-bound.
866       synchronized (lock) {
867         if (state.winningSubstream != null || substream.closed) {
868           return;
869         }
870         bufferNeeded += bytes;
871         if (bufferNeeded <= perRpcBufferUsed) {
872           return;
873         }
874 
875         if (bufferNeeded > perRpcBufferLimit) {
876           substream.bufferLimitExceeded = true;
877         } else {
878           // Only update channelBufferUsed when perRpcBufferUsed is not exceeding perRpcBufferLimit.
879           long savedChannelBufferUsed =
880               channelBufferUsed.addAndGet(bufferNeeded - perRpcBufferUsed);
881           perRpcBufferUsed = bufferNeeded;
882 
883           if (savedChannelBufferUsed > channelBufferLimit) {
884             substream.bufferLimitExceeded = true;
885           }
886         }
887 
888         if (substream.bufferLimitExceeded) {
889           postCommitTask = commit(substream);
890         }
891       }
892 
893       if (postCommitTask != null) {
894         postCommitTask.run();
895       }
896     }
897   }
898 
899   /**
900    *  Used to keep track of the total amount of memory used to buffer retryable or hedged RPCs for
901    *  the Channel. There should be a single instance of it for each channel.
902    */
903   static final class ChannelBufferMeter {
904     private final AtomicLong bufferUsed = new AtomicLong();
905 
906     @VisibleForTesting
addAndGet(long newBytesUsed)907     long addAndGet(long newBytesUsed) {
908       return bufferUsed.addAndGet(newBytesUsed);
909     }
910   }
911 
912   /**
913    * Used for retry throttling.
914    */
915   static final class Throttle {
916 
917     private static final int THREE_DECIMAL_PLACES_SCALE_UP = 1000;
918 
919     /**
920      * 1000 times the maxTokens field of the retryThrottling policy in service config.
921      * The number of tokens starts at maxTokens. The token_count will always be between 0 and
922      * maxTokens.
923      */
924     final int maxTokens;
925 
926     /**
927      * Half of {@code maxTokens}.
928      */
929     final int threshold;
930 
931     /**
932      * 1000 times the tokenRatio field of the retryThrottling policy in service config.
933      */
934     final int tokenRatio;
935 
936     final AtomicInteger tokenCount = new AtomicInteger();
937 
Throttle(float maxTokens, float tokenRatio)938     Throttle(float maxTokens, float tokenRatio) {
939       // tokenRatio is up to 3 decimal places
940       this.tokenRatio = (int) (tokenRatio * THREE_DECIMAL_PLACES_SCALE_UP);
941       this.maxTokens = (int) (maxTokens * THREE_DECIMAL_PLACES_SCALE_UP);
942       this.threshold = this.maxTokens / 2;
943       tokenCount.set(this.maxTokens);
944     }
945 
946     @VisibleForTesting
isAboveThreshold()947     boolean isAboveThreshold() {
948       return tokenCount.get() > threshold;
949     }
950 
951     /**
952      * Counts down the token on qualified failure and checks if it is above the threshold
953      * atomically. Qualified failure is a failure with a retryable or non-fatal status code or with
954      * a not-to-retry pushback.
955      */
956     @VisibleForTesting
onQualifiedFailureThenCheckIsAboveThreshold()957     boolean onQualifiedFailureThenCheckIsAboveThreshold() {
958       while (true) {
959         int currentCount = tokenCount.get();
960         if (currentCount == 0) {
961           return false;
962         }
963         int decremented = currentCount - (1 * THREE_DECIMAL_PLACES_SCALE_UP);
964         boolean updated = tokenCount.compareAndSet(currentCount, Math.max(decremented, 0));
965         if (updated) {
966           return decremented > threshold;
967         }
968       }
969     }
970 
971     @VisibleForTesting
onSuccess()972     void onSuccess() {
973       while (true) {
974         int currentCount = tokenCount.get();
975         if (currentCount == maxTokens) {
976           break;
977         }
978         int incremented = currentCount + tokenRatio;
979         boolean updated = tokenCount.compareAndSet(currentCount, Math.min(incremented, maxTokens));
980         if (updated) {
981           break;
982         }
983       }
984     }
985 
986     @Override
equals(Object o)987     public boolean equals(Object o) {
988       if (this == o) {
989         return true;
990       }
991       if (!(o instanceof Throttle)) {
992         return false;
993       }
994       Throttle that = (Throttle) o;
995       return maxTokens == that.maxTokens && tokenRatio == that.tokenRatio;
996     }
997 
998     @Override
hashCode()999     public int hashCode() {
1000       return Objects.hashCode(maxTokens, tokenRatio);
1001     }
1002   }
1003 
1004   private static final class RetryPlan {
1005     final boolean shouldRetry;
1006     // TODO(zdapeng) boolean hasHedging
1007     final long backoffNanos;
1008 
RetryPlan(boolean shouldRetry, long backoffNanos)1009     RetryPlan(boolean shouldRetry, long backoffNanos) {
1010       this.shouldRetry = shouldRetry;
1011       this.backoffNanos = backoffNanos;
1012     }
1013   }
1014 }
1015