1 /*
2  * Copyright 2017 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.internal;
18 
19 import static com.google.common.truth.Truth.assertThat;
20 import static io.grpc.internal.ClientStreamListener.RpcProgress.DROPPED;
21 import static io.grpc.internal.ClientStreamListener.RpcProgress.PROCESSED;
22 import static io.grpc.internal.ClientStreamListener.RpcProgress.REFUSED;
23 import static io.grpc.internal.RetriableStream.GRPC_PREVIOUS_RPC_ATTEMPTS;
24 import static org.junit.Assert.assertEquals;
25 import static org.junit.Assert.assertFalse;
26 import static org.junit.Assert.assertNotSame;
27 import static org.junit.Assert.assertNull;
28 import static org.junit.Assert.assertTrue;
29 import static org.mockito.AdditionalAnswers.delegatesTo;
30 import static org.mockito.Matchers.any;
31 import static org.mockito.Matchers.anyInt;
32 import static org.mockito.Matchers.same;
33 import static org.mockito.Mockito.doReturn;
34 import static org.mockito.Mockito.inOrder;
35 import static org.mockito.Mockito.mock;
36 import static org.mockito.Mockito.never;
37 import static org.mockito.Mockito.times;
38 import static org.mockito.Mockito.verify;
39 import static org.mockito.Mockito.verifyNoMoreInteractions;
40 import static org.mockito.Mockito.when;
41 
42 import com.google.common.collect.ImmutableSet;
43 import com.google.common.util.concurrent.MoreExecutors;
44 import io.grpc.CallOptions;
45 import io.grpc.ClientStreamTracer;
46 import io.grpc.Codec;
47 import io.grpc.Compressor;
48 import io.grpc.DecompressorRegistry;
49 import io.grpc.Metadata;
50 import io.grpc.MethodDescriptor;
51 import io.grpc.MethodDescriptor.MethodType;
52 import io.grpc.Status;
53 import io.grpc.Status.Code;
54 import io.grpc.StringMarshaller;
55 import io.grpc.internal.RetriableStream.ChannelBufferMeter;
56 import io.grpc.internal.RetriableStream.Throttle;
57 import io.grpc.internal.StreamListener.MessageProducer;
58 import java.io.InputStream;
59 import java.util.ArrayList;
60 import java.util.List;
61 import java.util.Random;
62 import java.util.concurrent.Executor;
63 import java.util.concurrent.ScheduledExecutorService;
64 import java.util.concurrent.TimeUnit;
65 import java.util.concurrent.atomic.AtomicReference;
66 import javax.annotation.Nullable;
67 import org.junit.After;
68 import org.junit.Test;
69 import org.junit.runner.RunWith;
70 import org.junit.runners.JUnit4;
71 import org.mockito.ArgumentCaptor;
72 import org.mockito.InOrder;
73 
74 /** Unit tests for {@link RetriableStream}. */
75 @RunWith(JUnit4.class)
76 public class RetriableStreamTest {
77   private static final String CANCELLED_BECAUSE_COMMITTED =
78       "Stream thrown away because RetriableStream committed";
79   private static final String AUTHORITY = "fakeAuthority";
80   private static final Compressor COMPRESSOR = Codec.Identity.NONE;
81   private static final DecompressorRegistry DECOMPRESSOR_REGISTRY =
82       DecompressorRegistry.getDefaultInstance();
83   private static final int MAX_INBOUND_MESSAGE_SIZE = 1234;
84   private static final int MAX_OUTBOUND_MESSAGE_SIZE = 5678;
85   private static final long PER_RPC_BUFFER_LIMIT = 1000;
86   private static final long CHANNEL_BUFFER_LIMIT = 2000;
87   private static final int MAX_ATTEMPTS = 6;
88   private static final long INITIAL_BACKOFF_IN_SECONDS = 100;
89   private static final long MAX_BACKOFF_IN_SECONDS = 700;
90   private static final double BACKOFF_MULTIPLIER = 2D;
91   private static final double FAKE_RANDOM = .5D;
92 
93   static {
RetriableStream.setRandom( new Random() { @Override public double nextDouble() { return FAKE_RANDOM; } })94     RetriableStream.setRandom(
95         // not random
96         new Random() {
97           @Override
98           public double nextDouble() {
99             return FAKE_RANDOM;
100           }
101         });
102   }
103 
104   private static final Code RETRIABLE_STATUS_CODE_1 = Code.UNAVAILABLE;
105   private static final Code RETRIABLE_STATUS_CODE_2 = Code.DATA_LOSS;
106   private static final Code NON_RETRIABLE_STATUS_CODE = Code.INTERNAL;
107   private static final RetryPolicy RETRY_POLICY =
108       new RetryPolicy(
109           MAX_ATTEMPTS,
110           TimeUnit.SECONDS.toNanos(INITIAL_BACKOFF_IN_SECONDS),
111           TimeUnit.SECONDS.toNanos(MAX_BACKOFF_IN_SECONDS),
112           BACKOFF_MULTIPLIER,
113           ImmutableSet.of(RETRIABLE_STATUS_CODE_1, RETRIABLE_STATUS_CODE_2));
114 
115   private final RetriableStreamRecorder retriableStreamRecorder =
116       mock(RetriableStreamRecorder.class);
117   private final ClientStreamListener masterListener = mock(ClientStreamListener.class);
118   private final MethodDescriptor<String, String> method =
119       MethodDescriptor.<String, String>newBuilder()
120           .setType(MethodType.BIDI_STREAMING)
121           .setFullMethodName(MethodDescriptor.generateFullMethodName("service_foo", "method_bar"))
122           .setRequestMarshaller(new StringMarshaller())
123           .setResponseMarshaller(new StringMarshaller())
124           .build();
125   private final ChannelBufferMeter channelBufferUsed = new ChannelBufferMeter();
126   private final FakeClock fakeClock = new FakeClock();
127 
128   private final class RecordedRetriableStream extends RetriableStream<String> {
RecordedRetriableStream(MethodDescriptor<String, ?> method, Metadata headers, ChannelBufferMeter channelBufferUsed, long perRpcBufferLimit, long channelBufferLimit, Executor callExecutor, ScheduledExecutorService scheduledExecutorService, final RetryPolicy retryPolicy, final HedgingPolicy hedgingPolicy, @Nullable Throttle throttle)129     RecordedRetriableStream(MethodDescriptor<String, ?> method, Metadata headers,
130         ChannelBufferMeter channelBufferUsed, long perRpcBufferLimit, long channelBufferLimit,
131         Executor callExecutor,
132         ScheduledExecutorService scheduledExecutorService,
133         final RetryPolicy retryPolicy,
134         final HedgingPolicy hedgingPolicy,
135         @Nullable Throttle throttle) {
136       super(
137           method, headers, channelBufferUsed, perRpcBufferLimit, channelBufferLimit, callExecutor,
138           scheduledExecutorService,
139           new RetryPolicy.Provider() {
140             @Override
141             public RetryPolicy get() {
142               return retryPolicy;
143             }
144           },
145           new HedgingPolicy.Provider() {
146             @Override
147             public HedgingPolicy get() {
148               return hedgingPolicy;
149             }
150           },
151           throttle);
152     }
153 
154     @Override
postCommit()155     void postCommit() {
156       retriableStreamRecorder.postCommit();
157     }
158 
159     @Override
newSubstream(ClientStreamTracer.Factory tracerFactory, Metadata metadata)160     ClientStream newSubstream(ClientStreamTracer.Factory tracerFactory, Metadata metadata) {
161       bufferSizeTracer =
162           tracerFactory.newClientStreamTracer(CallOptions.DEFAULT, new Metadata());
163       int actualPreviousRpcAttemptsInHeader = metadata.get(GRPC_PREVIOUS_RPC_ATTEMPTS) == null
164           ? 0 : Integer.valueOf(metadata.get(GRPC_PREVIOUS_RPC_ATTEMPTS));
165       return retriableStreamRecorder.newSubstream(actualPreviousRpcAttemptsInHeader);
166     }
167 
168     @Override
prestart()169     Status prestart() {
170       return retriableStreamRecorder.prestart();
171     }
172   }
173 
174   private final RetriableStream<String> retriableStream =
175       newThrottledRetriableStream(null /* throttle */);
176 
177   private ClientStreamTracer bufferSizeTracer;
178 
newThrottledRetriableStream(Throttle throttle)179   private RetriableStream<String> newThrottledRetriableStream(Throttle throttle) {
180     return new RecordedRetriableStream(
181         method, new Metadata(), channelBufferUsed, PER_RPC_BUFFER_LIMIT, CHANNEL_BUFFER_LIMIT,
182         MoreExecutors.directExecutor(), fakeClock.getScheduledExecutorService(), RETRY_POLICY,
183         HedgingPolicy.DEFAULT, throttle);
184   }
185 
186   @After
tearDown()187   public void tearDown() {
188     assertEquals(0, fakeClock.numPendingTasks());
189   }
190 
191   @Test
retry_everythingDrained()192   public void retry_everythingDrained() {
193     ClientStream mockStream1 = mock(ClientStream.class);
194     ClientStream mockStream2 = mock(ClientStream.class);
195     ClientStream mockStream3 = mock(ClientStream.class);
196     doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0);
197     InOrder inOrder =
198         inOrder(retriableStreamRecorder, masterListener, mockStream1, mockStream2, mockStream3);
199 
200     // stream settings before start
201     retriableStream.setAuthority(AUTHORITY);
202     retriableStream.setCompressor(COMPRESSOR);
203     retriableStream.setDecompressorRegistry(DECOMPRESSOR_REGISTRY);
204     retriableStream.setFullStreamDecompression(false);
205     retriableStream.setFullStreamDecompression(true);
206     retriableStream.setMaxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE);
207     retriableStream.setMessageCompression(true);
208     retriableStream.setMaxOutboundMessageSize(MAX_OUTBOUND_MESSAGE_SIZE);
209     retriableStream.setMessageCompression(false);
210 
211     inOrder.verifyNoMoreInteractions();
212 
213     // start
214     retriableStream.start(masterListener);
215 
216     inOrder.verify(retriableStreamRecorder).prestart();
217     inOrder.verify(retriableStreamRecorder).newSubstream(0);
218 
219     inOrder.verify(mockStream1).setAuthority(AUTHORITY);
220     inOrder.verify(mockStream1).setCompressor(COMPRESSOR);
221     inOrder.verify(mockStream1).setDecompressorRegistry(DECOMPRESSOR_REGISTRY);
222     inOrder.verify(mockStream1).setFullStreamDecompression(false);
223     inOrder.verify(mockStream1).setFullStreamDecompression(true);
224     inOrder.verify(mockStream1).setMaxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE);
225     inOrder.verify(mockStream1).setMessageCompression(true);
226     inOrder.verify(mockStream1).setMaxOutboundMessageSize(MAX_OUTBOUND_MESSAGE_SIZE);
227     inOrder.verify(mockStream1).setMessageCompression(false);
228 
229     ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
230         ArgumentCaptor.forClass(ClientStreamListener.class);
231     inOrder.verify(mockStream1).start(sublistenerCaptor1.capture());
232     inOrder.verifyNoMoreInteractions();
233 
234     retriableStream.sendMessage("msg1");
235     retriableStream.sendMessage("msg2");
236     retriableStream.request(345);
237     retriableStream.flush();
238     retriableStream.flush();
239     retriableStream.sendMessage("msg3");
240     retriableStream.request(456);
241 
242     inOrder.verify(mockStream1, times(2)).writeMessage(any(InputStream.class));
243     inOrder.verify(mockStream1).request(345);
244     inOrder.verify(mockStream1, times(2)).flush();
245     inOrder.verify(mockStream1).writeMessage(any(InputStream.class));
246     inOrder.verify(mockStream1).request(456);
247     inOrder.verifyNoMoreInteractions();
248 
249     // retry1
250     doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1);
251     sublistenerCaptor1.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_2), new Metadata());
252     assertEquals(1, fakeClock.numPendingTasks());
253 
254     // send more messages during backoff
255     retriableStream.sendMessage("msg1 during backoff1");
256     retriableStream.sendMessage("msg2 during backoff1");
257 
258     fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM) - 1L, TimeUnit.SECONDS);
259     inOrder.verifyNoMoreInteractions();
260     assertEquals(1, fakeClock.numPendingTasks());
261     fakeClock.forwardTime(1L, TimeUnit.SECONDS);
262     assertEquals(0, fakeClock.numPendingTasks());
263     inOrder.verify(retriableStreamRecorder).newSubstream(1);
264     inOrder.verify(mockStream2).setAuthority(AUTHORITY);
265     inOrder.verify(mockStream2).setCompressor(COMPRESSOR);
266     inOrder.verify(mockStream2).setDecompressorRegistry(DECOMPRESSOR_REGISTRY);
267     inOrder.verify(mockStream2).setFullStreamDecompression(false);
268     inOrder.verify(mockStream2).setFullStreamDecompression(true);
269     inOrder.verify(mockStream2).setMaxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE);
270     inOrder.verify(mockStream2).setMessageCompression(true);
271     inOrder.verify(mockStream2).setMaxOutboundMessageSize(MAX_OUTBOUND_MESSAGE_SIZE);
272     inOrder.verify(mockStream2).setMessageCompression(false);
273 
274     ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
275         ArgumentCaptor.forClass(ClientStreamListener.class);
276     inOrder.verify(mockStream2).start(sublistenerCaptor2.capture());
277     inOrder.verify(mockStream2, times(2)).writeMessage(any(InputStream.class));
278     inOrder.verify(mockStream2).request(345);
279     inOrder.verify(mockStream2, times(2)).flush();
280     inOrder.verify(mockStream2).writeMessage(any(InputStream.class));
281     inOrder.verify(mockStream2).request(456);
282     inOrder.verify(mockStream2, times(2)).writeMessage(any(InputStream.class));
283     inOrder.verifyNoMoreInteractions();
284 
285     // send more messages
286     retriableStream.sendMessage("msg1 after retry1");
287     retriableStream.sendMessage("msg2 after retry1");
288 
289     // mockStream1 is closed so it is not in the drainedSubstreams
290     verifyNoMoreInteractions(mockStream1);
291     inOrder.verify(mockStream2, times(2)).writeMessage(any(InputStream.class));
292 
293     // retry2
294     doReturn(mockStream3).when(retriableStreamRecorder).newSubstream(2);
295     sublistenerCaptor2.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), new Metadata());
296     assertEquals(1, fakeClock.numPendingTasks());
297 
298     // send more messages during backoff
299     retriableStream.sendMessage("msg1 during backoff2");
300     retriableStream.sendMessage("msg2 during backoff2");
301     retriableStream.sendMessage("msg3 during backoff2");
302 
303     fakeClock.forwardTime(
304         (long) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * FAKE_RANDOM) - 1L,
305         TimeUnit.SECONDS);
306     inOrder.verifyNoMoreInteractions();
307     assertEquals(1, fakeClock.numPendingTasks());
308     fakeClock.forwardTime(1L, TimeUnit.SECONDS);
309     assertEquals(0, fakeClock.numPendingTasks());
310     inOrder.verify(retriableStreamRecorder).newSubstream(2);
311     inOrder.verify(mockStream3).setAuthority(AUTHORITY);
312     inOrder.verify(mockStream3).setCompressor(COMPRESSOR);
313     inOrder.verify(mockStream3).setDecompressorRegistry(DECOMPRESSOR_REGISTRY);
314     inOrder.verify(mockStream3).setFullStreamDecompression(false);
315     inOrder.verify(mockStream3).setFullStreamDecompression(true);
316     inOrder.verify(mockStream3).setMaxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE);
317     inOrder.verify(mockStream3).setMessageCompression(true);
318     inOrder.verify(mockStream3).setMaxOutboundMessageSize(MAX_OUTBOUND_MESSAGE_SIZE);
319     inOrder.verify(mockStream3).setMessageCompression(false);
320 
321     ArgumentCaptor<ClientStreamListener> sublistenerCaptor3 =
322         ArgumentCaptor.forClass(ClientStreamListener.class);
323     inOrder.verify(mockStream3).start(sublistenerCaptor3.capture());
324     inOrder.verify(mockStream3, times(2)).writeMessage(any(InputStream.class));
325     inOrder.verify(mockStream3).request(345);
326     inOrder.verify(mockStream3, times(2)).flush();
327     inOrder.verify(mockStream3).writeMessage(any(InputStream.class));
328     inOrder.verify(mockStream3).request(456);
329     inOrder.verify(mockStream3, times(7)).writeMessage(any(InputStream.class));
330     inOrder.verifyNoMoreInteractions();
331 
332     // no more retry
333     sublistenerCaptor3.getValue().closed(
334         Status.fromCode(NON_RETRIABLE_STATUS_CODE), new Metadata());
335 
336     inOrder.verify(retriableStreamRecorder).postCommit();
337     inOrder.verify(masterListener).closed(any(Status.class), any(Metadata.class));
338     inOrder.verifyNoMoreInteractions();
339   }
340 
341   @Test
headersRead_cancel()342   public void headersRead_cancel() {
343     ClientStream mockStream1 = mock(ClientStream.class);
344     doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0);
345     InOrder inOrder = inOrder(retriableStreamRecorder);
346 
347     retriableStream.start(masterListener);
348 
349     ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
350         ArgumentCaptor.forClass(ClientStreamListener.class);
351     verify(mockStream1).start(sublistenerCaptor1.capture());
352 
353     sublistenerCaptor1.getValue().headersRead(new Metadata());
354 
355     inOrder.verify(retriableStreamRecorder).postCommit();
356 
357     retriableStream.cancel(Status.CANCELLED);
358 
359     inOrder.verify(retriableStreamRecorder, never()).postCommit();
360   }
361 
362   @Test
retry_headersRead_cancel()363   public void retry_headersRead_cancel() {
364     ClientStream mockStream1 = mock(ClientStream.class);
365     doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0);
366     InOrder inOrder = inOrder(retriableStreamRecorder);
367 
368     retriableStream.start(masterListener);
369 
370     ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
371         ArgumentCaptor.forClass(ClientStreamListener.class);
372     verify(mockStream1).start(sublistenerCaptor1.capture());
373 
374     // retry
375     ClientStream mockStream2 = mock(ClientStream.class);
376     doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1);
377     sublistenerCaptor1.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), new Metadata());
378     assertEquals(1, fakeClock.numPendingTasks());
379     fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);
380 
381     ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
382         ArgumentCaptor.forClass(ClientStreamListener.class);
383     verify(mockStream2).start(sublistenerCaptor2.capture());
384     inOrder.verify(retriableStreamRecorder, never()).postCommit();
385 
386     // headersRead
387     sublistenerCaptor2.getValue().headersRead(new Metadata());
388 
389     inOrder.verify(retriableStreamRecorder).postCommit();
390 
391     // cancel
392     retriableStream.cancel(Status.CANCELLED);
393 
394     inOrder.verify(retriableStreamRecorder, never()).postCommit();
395   }
396 
397   @Test
headersRead_closed()398   public void headersRead_closed() {
399     ClientStream mockStream1 = mock(ClientStream.class);
400     doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0);
401     InOrder inOrder = inOrder(retriableStreamRecorder);
402 
403     retriableStream.start(masterListener);
404 
405     ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
406         ArgumentCaptor.forClass(ClientStreamListener.class);
407     verify(mockStream1).start(sublistenerCaptor1.capture());
408 
409     sublistenerCaptor1.getValue().headersRead(new Metadata());
410 
411     inOrder.verify(retriableStreamRecorder).postCommit();
412 
413     Status status = Status.fromCode(RETRIABLE_STATUS_CODE_1);
414     Metadata metadata = new Metadata();
415     sublistenerCaptor1.getValue().closed(status, metadata);
416 
417     verify(masterListener).closed(status, metadata);
418     inOrder.verify(retriableStreamRecorder, never()).postCommit();
419   }
420 
421   @Test
retry_headersRead_closed()422   public void retry_headersRead_closed() {
423     ClientStream mockStream1 = mock(ClientStream.class);
424     doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0);
425     InOrder inOrder = inOrder(retriableStreamRecorder);
426 
427     retriableStream.start(masterListener);
428 
429     ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
430         ArgumentCaptor.forClass(ClientStreamListener.class);
431     verify(mockStream1).start(sublistenerCaptor1.capture());
432 
433     // retry
434     ClientStream mockStream2 = mock(ClientStream.class);
435     doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1);
436     sublistenerCaptor1.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), new Metadata());
437     fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);
438 
439     ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
440         ArgumentCaptor.forClass(ClientStreamListener.class);
441     verify(mockStream2).start(sublistenerCaptor2.capture());
442     inOrder.verify(retriableStreamRecorder, never()).postCommit();
443 
444     // headersRead
445     sublistenerCaptor2.getValue().headersRead(new Metadata());
446 
447     inOrder.verify(retriableStreamRecorder).postCommit();
448 
449     // closed even with retriable status
450     Status status = Status.fromCode(RETRIABLE_STATUS_CODE_1);
451     Metadata metadata = new Metadata();
452     sublistenerCaptor2.getValue().closed(status, metadata);
453 
454     verify(masterListener).closed(status, metadata);
455     inOrder.verify(retriableStreamRecorder, never()).postCommit();
456   }
457 
458   @Test
cancel_closed()459   public void cancel_closed() {
460     ClientStream mockStream1 = mock(ClientStream.class);
461     doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0);
462     InOrder inOrder = inOrder(retriableStreamRecorder);
463 
464     retriableStream.start(masterListener);
465 
466     ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
467         ArgumentCaptor.forClass(ClientStreamListener.class);
468     verify(mockStream1).start(sublistenerCaptor1.capture());
469 
470     // cancel
471     retriableStream.cancel(Status.CANCELLED);
472 
473     inOrder.verify(retriableStreamRecorder).postCommit();
474     ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
475     verify(mockStream1).cancel(statusCaptor.capture());
476     assertEquals(Status.CANCELLED.getCode(), statusCaptor.getValue().getCode());
477     assertEquals(CANCELLED_BECAUSE_COMMITTED, statusCaptor.getValue().getDescription());
478 
479     // closed even with retriable status
480     Status status = Status.fromCode(RETRIABLE_STATUS_CODE_1);
481     Metadata metadata = new Metadata();
482     sublistenerCaptor1.getValue().closed(status, metadata);
483     inOrder.verify(retriableStreamRecorder, never()).postCommit();
484   }
485 
486   @Test
retry_cancel_closed()487   public void retry_cancel_closed() {
488     ClientStream mockStream1 = mock(ClientStream.class);
489     doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0);
490     InOrder inOrder = inOrder(retriableStreamRecorder);
491 
492     retriableStream.start(masterListener);
493 
494     ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
495         ArgumentCaptor.forClass(ClientStreamListener.class);
496     verify(mockStream1).start(sublistenerCaptor1.capture());
497 
498     // retry
499     ClientStream mockStream2 = mock(ClientStream.class);
500     doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1);
501     sublistenerCaptor1.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), new Metadata());
502     fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);
503 
504     ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
505         ArgumentCaptor.forClass(ClientStreamListener.class);
506     verify(mockStream2).start(sublistenerCaptor2.capture());
507     inOrder.verify(retriableStreamRecorder, never()).postCommit();
508 
509     // cancel
510     retriableStream.cancel(Status.CANCELLED);
511 
512     inOrder.verify(retriableStreamRecorder).postCommit();
513     ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
514     verify(mockStream2).cancel(statusCaptor.capture());
515     assertEquals(Status.CANCELLED.getCode(), statusCaptor.getValue().getCode());
516     assertEquals(CANCELLED_BECAUSE_COMMITTED, statusCaptor.getValue().getDescription());
517 
518     // closed
519     Status status = Status.fromCode(NON_RETRIABLE_STATUS_CODE);
520     Metadata metadata = new Metadata();
521     sublistenerCaptor2.getValue().closed(status, metadata);
522     inOrder.verify(retriableStreamRecorder, never()).postCommit();
523   }
524 
525   @Test
unretriableClosed_cancel()526   public void unretriableClosed_cancel() {
527     ClientStream mockStream1 = mock(ClientStream.class);
528     doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0);
529     InOrder inOrder = inOrder(retriableStreamRecorder);
530 
531     retriableStream.start(masterListener);
532 
533     ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
534         ArgumentCaptor.forClass(ClientStreamListener.class);
535     verify(mockStream1).start(sublistenerCaptor1.capture());
536 
537     // closed
538     Status status = Status.fromCode(NON_RETRIABLE_STATUS_CODE);
539     Metadata metadata = new Metadata();
540     sublistenerCaptor1.getValue().closed(status, metadata);
541 
542     inOrder.verify(retriableStreamRecorder).postCommit();
543     verify(masterListener).closed(status, metadata);
544 
545     // cancel
546     retriableStream.cancel(Status.CANCELLED);
547     inOrder.verify(retriableStreamRecorder, never()).postCommit();
548   }
549 
550   @Test
retry_unretriableClosed_cancel()551   public void retry_unretriableClosed_cancel() {
552     ClientStream mockStream1 = mock(ClientStream.class);
553     doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0);
554     InOrder inOrder = inOrder(retriableStreamRecorder);
555 
556     retriableStream.start(masterListener);
557 
558     ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
559         ArgumentCaptor.forClass(ClientStreamListener.class);
560     verify(mockStream1).start(sublistenerCaptor1.capture());
561 
562     // retry
563     ClientStream mockStream2 = mock(ClientStream.class);
564     doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1);
565     sublistenerCaptor1.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), new Metadata());
566     fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);
567 
568     ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
569         ArgumentCaptor.forClass(ClientStreamListener.class);
570     verify(mockStream2).start(sublistenerCaptor2.capture());
571     inOrder.verify(retriableStreamRecorder, never()).postCommit();
572 
573     // closed
574     Status status = Status.fromCode(NON_RETRIABLE_STATUS_CODE);
575     Metadata metadata = new Metadata();
576     sublistenerCaptor2.getValue().closed(status, metadata);
577 
578     inOrder.verify(retriableStreamRecorder).postCommit();
579     verify(masterListener).closed(status, metadata);
580 
581     // cancel
582     retriableStream.cancel(Status.CANCELLED);
583     inOrder.verify(retriableStreamRecorder, never()).postCommit();
584   }
585 
586   @Test
retry_cancelWhileBackoff()587   public void retry_cancelWhileBackoff() {
588     ClientStream mockStream1 = mock(ClientStream.class);
589     doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0);
590 
591     retriableStream.start(masterListener);
592 
593     ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
594         ArgumentCaptor.forClass(ClientStreamListener.class);
595     verify(mockStream1).start(sublistenerCaptor1.capture());
596 
597     // retry
598     ClientStream mockStream2 = mock(ClientStream.class);
599     doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1);
600     sublistenerCaptor1.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), new Metadata());
601 
602     // cancel while backoff
603     assertEquals(1, fakeClock.numPendingTasks());
604     verify(retriableStreamRecorder, never()).postCommit();
605     retriableStream.cancel(Status.CANCELLED);
606     verify(retriableStreamRecorder).postCommit();
607 
608     verifyNoMoreInteractions(mockStream1);
609     verifyNoMoreInteractions(mockStream2);
610   }
611 
612   @Test
operationsWhileDraining()613   public void operationsWhileDraining() {
614     ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
615         ArgumentCaptor.forClass(ClientStreamListener.class);
616     final AtomicReference<ClientStreamListener> sublistenerCaptor2 =
617         new AtomicReference<ClientStreamListener>();
618     final Status cancelStatus = Status.CANCELLED.withDescription("c");
619     ClientStream mockStream1 =
620         mock(
621             ClientStream.class,
622             delegatesTo(
623                 new NoopClientStream() {
624                   @Override
625                   public void request(int numMessages) {
626                     retriableStream.sendMessage("substream1 request " + numMessages);
627                     if (numMessages > 1) {
628                       retriableStream.request(--numMessages);
629                     }
630                   }
631                 }));
632 
633     final ClientStream mockStream2 =
634         mock(
635             ClientStream.class,
636             delegatesTo(
637                 new NoopClientStream() {
638                   @Override
639                   public void start(ClientStreamListener listener) {
640                     sublistenerCaptor2.set(listener);
641                   }
642 
643                   @Override
644                   public void request(int numMessages) {
645                     retriableStream.sendMessage("substream2 request " + numMessages);
646 
647                     if (numMessages == 3) {
648                       sublistenerCaptor2.get().headersRead(new Metadata());
649                     }
650                     if (numMessages == 2) {
651                       retriableStream.request(100);
652                     }
653                     if (numMessages == 100) {
654                       retriableStream.cancel(cancelStatus);
655                     }
656                   }
657                 }));
658 
659     InOrder inOrder = inOrder(retriableStreamRecorder, mockStream1, mockStream2);
660 
661     doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0);
662     retriableStream.start(masterListener);
663 
664     inOrder.verify(mockStream1).start(sublistenerCaptor1.capture());
665 
666     retriableStream.request(3);
667 
668     inOrder.verify(mockStream1).request(3);
669     inOrder.verify(mockStream1).writeMessage(any(InputStream.class)); // msg "substream1 request 3"
670     inOrder.verify(mockStream1).request(2);
671     inOrder.verify(mockStream1).writeMessage(any(InputStream.class)); // msg "substream1 request 2"
672     inOrder.verify(mockStream1).request(1);
673     inOrder.verify(mockStream1).writeMessage(any(InputStream.class)); // msg "substream1 request 1"
674 
675     // retry
676     doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1);
677     sublistenerCaptor1.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), new Metadata());
678     assertEquals(1, fakeClock.numPendingTasks());
679 
680     // send more requests during backoff
681     retriableStream.request(789);
682 
683     fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);
684 
685     inOrder.verify(mockStream2).start(sublistenerCaptor2.get());
686     inOrder.verify(mockStream2).request(3);
687     inOrder.verify(retriableStreamRecorder).postCommit();
688     inOrder.verify(mockStream2).writeMessage(any(InputStream.class)); // msg "substream1 request 3"
689     inOrder.verify(mockStream2).request(2);
690     inOrder.verify(mockStream2).writeMessage(any(InputStream.class)); // msg "substream1 request 2"
691     inOrder.verify(mockStream2).request(1);
692 
693     // msg "substream1 request 1"
694     inOrder.verify(mockStream2).writeMessage(any(InputStream.class));
695     inOrder.verify(mockStream2).request(789);
696     // msg "substream2 request 3"
697     // msg "substream2 request 2"
698     inOrder.verify(mockStream2, times(2)).writeMessage(any(InputStream.class));
699     inOrder.verify(mockStream2).request(100);
700 
701     verify(mockStream2).cancel(cancelStatus);
702 
703     // "substream2 request 1" will never be sent
704     inOrder.verify(mockStream2, never()).writeMessage(any(InputStream.class));
705   }
706 
707   @Test
operationsAfterImmediateCommit()708   public void operationsAfterImmediateCommit() {
709     ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
710         ArgumentCaptor.forClass(ClientStreamListener.class);
711     ClientStream mockStream1 = mock(ClientStream.class);
712 
713     InOrder inOrder = inOrder(retriableStreamRecorder, mockStream1);
714 
715     doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0);
716     retriableStream.start(masterListener);
717 
718     // drained
719     inOrder.verify(mockStream1).start(sublistenerCaptor1.capture());
720 
721     // commit
722     sublistenerCaptor1.getValue().headersRead(new Metadata());
723 
724     retriableStream.request(3);
725     inOrder.verify(mockStream1).request(3);
726     retriableStream.sendMessage("msg 1");
727     inOrder.verify(mockStream1).writeMessage(any(InputStream.class));
728   }
729 
730   @Test
isReady_whenDrained()731   public void isReady_whenDrained() {
732     ClientStream mockStream1 = mock(ClientStream.class);
733 
734     doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0);
735     retriableStream.start(masterListener);
736 
737     assertFalse(retriableStream.isReady());
738 
739     doReturn(true).when(mockStream1).isReady();
740 
741     assertTrue(retriableStream.isReady());
742   }
743 
744   @Test
isReady_whileDraining()745   public void isReady_whileDraining() {
746     final AtomicReference<ClientStreamListener> sublistenerCaptor1 =
747         new AtomicReference<ClientStreamListener>();
748     final List<Boolean> readiness = new ArrayList<>();
749     ClientStream mockStream1 =
750         mock(
751             ClientStream.class,
752             delegatesTo(
753                 new NoopClientStream() {
754                   @Override
755                   public void start(ClientStreamListener listener) {
756                     sublistenerCaptor1.set(listener);
757                     readiness.add(retriableStream.isReady()); // expected false b/c in draining
758                   }
759 
760                   @Override
761                   public boolean isReady() {
762                     return true;
763                   }
764                 }));
765 
766     final ClientStream mockStream2 =
767         mock(
768             ClientStream.class,
769             delegatesTo(
770                 new NoopClientStream() {
771                   @Override
772                   public void start(ClientStreamListener listener) {
773                     readiness.add(retriableStream.isReady()); // expected false b/c in draining
774                   }
775 
776                   @Override
777                   public boolean isReady() {
778                     return true;
779                   }
780                 }));
781 
782     doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0);
783     retriableStream.start(masterListener);
784 
785     verify(mockStream1).start(sublistenerCaptor1.get());
786     readiness.add(retriableStream.isReady()); // expected true
787 
788     // retry
789     doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1);
790     doReturn(false).when(mockStream1).isReady(); // mockStream1 closed, so isReady false
791     sublistenerCaptor1.get().closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), new Metadata());
792     assertEquals(1, fakeClock.numPendingTasks());
793 
794     // send more requests during backoff
795     retriableStream.request(789);
796     readiness.add(retriableStream.isReady()); // expected false b/c in backoff
797 
798     fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);
799 
800     verify(mockStream2).start(any(ClientStreamListener.class));
801     readiness.add(retriableStream.isReady()); // expected true
802 
803     assertThat(readiness).containsExactly(false, true, false, false, true).inOrder();
804   }
805 
806   @Test
messageAvailable()807   public void messageAvailable() {
808     ClientStream mockStream1 = mock(ClientStream.class);
809     doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0);
810 
811     retriableStream.start(masterListener);
812 
813     ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
814         ArgumentCaptor.forClass(ClientStreamListener.class);
815     verify(mockStream1).start(sublistenerCaptor1.capture());
816 
817     ClientStreamListener listener = sublistenerCaptor1.getValue();
818     listener.headersRead(new Metadata());
819     MessageProducer messageProducer = mock(MessageProducer.class);
820     listener.messagesAvailable(messageProducer);
821     verify(masterListener).messagesAvailable(messageProducer);
822   }
823 
824   @Test
closedWhileDraining()825   public void closedWhileDraining() {
826     ClientStream mockStream1 = mock(ClientStream.class);
827     final ClientStream mockStream2 =
828         mock(
829             ClientStream.class,
830             delegatesTo(
831                 new NoopClientStream() {
832                   @Override
833                   public void start(ClientStreamListener listener) {
834                     // closed while draning
835                     listener.closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), new Metadata());
836                   }
837                 }));
838     final ClientStream mockStream3 = mock(ClientStream.class);
839 
840     when(retriableStreamRecorder.newSubstream(anyInt()))
841         .thenReturn(mockStream1, mockStream2, mockStream3);
842 
843     retriableStream.start(masterListener);
844     retriableStream.sendMessage("msg1");
845     retriableStream.sendMessage("msg2");
846 
847     ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
848         ArgumentCaptor.forClass(ClientStreamListener.class);
849     verify(mockStream1).start(sublistenerCaptor1.capture());
850 
851     ClientStreamListener listener1 = sublistenerCaptor1.getValue();
852 
853     // retry
854     listener1.closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), new Metadata());
855     assertEquals(1, fakeClock.numPendingTasks());
856     fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);
857     assertEquals(1, fakeClock.numPendingTasks());
858 
859     // send requests during backoff
860     retriableStream.request(3);
861     fakeClock.forwardTime(
862         (long) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * FAKE_RANDOM), TimeUnit.SECONDS);
863 
864     retriableStream.request(1);
865     verify(mockStream1, never()).request(anyInt());
866     verify(mockStream2, never()).request(anyInt());
867     verify(mockStream3).request(3);
868     verify(mockStream3).request(1);
869   }
870 
871 
872   @Test
perRpcBufferLimitExceeded()873   public void perRpcBufferLimitExceeded() {
874     ClientStream mockStream1 = mock(ClientStream.class);
875     doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0);
876 
877     retriableStream.start(masterListener);
878 
879     bufferSizeTracer.outboundWireSize(PER_RPC_BUFFER_LIMIT);
880 
881     assertEquals(PER_RPC_BUFFER_LIMIT, channelBufferUsed.addAndGet(0));
882 
883     verify(retriableStreamRecorder, never()).postCommit();
884     bufferSizeTracer.outboundWireSize(2);
885     verify(retriableStreamRecorder).postCommit();
886 
887     // verify channel buffer is adjusted
888     assertEquals(0, channelBufferUsed.addAndGet(0));
889   }
890 
891   @Test
perRpcBufferLimitExceededDuringBackoff()892   public void perRpcBufferLimitExceededDuringBackoff() {
893     ClientStream mockStream1 = mock(ClientStream.class);
894     doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0);
895 
896     retriableStream.start(masterListener);
897 
898     ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
899         ArgumentCaptor.forClass(ClientStreamListener.class);
900     verify(mockStream1).start(sublistenerCaptor1.capture());
901 
902     bufferSizeTracer.outboundWireSize(PER_RPC_BUFFER_LIMIT - 1);
903 
904     // retry
905     ClientStream mockStream2 = mock(ClientStream.class);
906     doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1);
907     sublistenerCaptor1.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), new Metadata());
908 
909     // bufferSizeTracer.outboundWireSize() quits immediately while backoff b/c substream1 is closed
910     assertEquals(1, fakeClock.numPendingTasks());
911     bufferSizeTracer.outboundWireSize(2);
912     verify(retriableStreamRecorder, never()).postCommit();
913 
914     fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);
915     verify(mockStream2).start(any(ClientStreamListener.class));
916 
917     // bufferLimitExceeded
918     bufferSizeTracer.outboundWireSize(PER_RPC_BUFFER_LIMIT - 1);
919     verify(retriableStreamRecorder, never()).postCommit();
920     bufferSizeTracer.outboundWireSize(2);
921     verify(retriableStreamRecorder).postCommit();
922 
923     verifyNoMoreInteractions(mockStream1);
924     verifyNoMoreInteractions(mockStream2);
925   }
926 
927   @Test
channelBufferLimitExceeded()928   public void channelBufferLimitExceeded() {
929     ClientStream mockStream1 = mock(ClientStream.class);
930     doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0);
931 
932     retriableStream.start(masterListener);
933 
934     bufferSizeTracer.outboundWireSize(100);
935 
936     assertEquals(100, channelBufferUsed.addAndGet(0));
937 
938     channelBufferUsed.addAndGet(CHANNEL_BUFFER_LIMIT - 200);
939     verify(retriableStreamRecorder, never()).postCommit();
940     bufferSizeTracer.outboundWireSize(100 + 1);
941     verify(retriableStreamRecorder).postCommit();
942 
943     // verify channel buffer is adjusted
944     assertEquals(CHANNEL_BUFFER_LIMIT - 200, channelBufferUsed.addAndGet(0));
945   }
946 
947   @Test
updateHeaders()948   public void updateHeaders() {
949     Metadata originalHeaders = new Metadata();
950     Metadata headers = retriableStream.updateHeaders(originalHeaders, 0);
951     assertNotSame(originalHeaders, headers);
952     assertNull(headers.get(GRPC_PREVIOUS_RPC_ATTEMPTS));
953 
954     headers = retriableStream.updateHeaders(originalHeaders, 345);
955     assertEquals("345", headers.get(GRPC_PREVIOUS_RPC_ATTEMPTS));
956     assertNull(originalHeaders.get(GRPC_PREVIOUS_RPC_ATTEMPTS));
957   }
958 
959   @Test
expBackoff_maxBackoff_maxRetryAttempts()960   public void expBackoff_maxBackoff_maxRetryAttempts() {
961     ClientStream mockStream1 = mock(ClientStream.class);
962     ClientStream mockStream2 = mock(ClientStream.class);
963     ClientStream mockStream3 = mock(ClientStream.class);
964     ClientStream mockStream4 = mock(ClientStream.class);
965     ClientStream mockStream5 = mock(ClientStream.class);
966     ClientStream mockStream6 = mock(ClientStream.class);
967     ClientStream mockStream7 = mock(ClientStream.class);
968     InOrder inOrder = inOrder(
969         mockStream1, mockStream2, mockStream3, mockStream4, mockStream5, mockStream6, mockStream7);
970     when(retriableStreamRecorder.newSubstream(anyInt())).thenReturn(
971         mockStream1, mockStream2, mockStream3, mockStream4, mockStream5, mockStream6, mockStream7);
972 
973     retriableStream.start(masterListener);
974     assertEquals(0, fakeClock.numPendingTasks());
975     verify(retriableStreamRecorder).newSubstream(0);
976     ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
977         ArgumentCaptor.forClass(ClientStreamListener.class);
978     inOrder.verify(mockStream1).start(sublistenerCaptor1.capture());
979     inOrder.verifyNoMoreInteractions();
980 
981 
982     // retry1
983     sublistenerCaptor1.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), new Metadata());
984     assertEquals(1, fakeClock.numPendingTasks());
985     fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM) - 1L, TimeUnit.SECONDS);
986     assertEquals(1, fakeClock.numPendingTasks());
987     fakeClock.forwardTime(1L, TimeUnit.SECONDS);
988     assertEquals(0, fakeClock.numPendingTasks());
989     verify(retriableStreamRecorder).newSubstream(1);
990     ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
991         ArgumentCaptor.forClass(ClientStreamListener.class);
992     inOrder.verify(mockStream2).start(sublistenerCaptor2.capture());
993     inOrder.verifyNoMoreInteractions();
994 
995     // retry2
996     sublistenerCaptor2.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_2), new Metadata());
997     assertEquals(1, fakeClock.numPendingTasks());
998     fakeClock.forwardTime(
999         (long) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * FAKE_RANDOM) - 1L,
1000         TimeUnit.SECONDS);
1001     assertEquals(1, fakeClock.numPendingTasks());
1002     fakeClock.forwardTime(1L, TimeUnit.SECONDS);
1003     assertEquals(0, fakeClock.numPendingTasks());
1004     verify(retriableStreamRecorder).newSubstream(2);
1005     ArgumentCaptor<ClientStreamListener> sublistenerCaptor3 =
1006         ArgumentCaptor.forClass(ClientStreamListener.class);
1007     inOrder.verify(mockStream3).start(sublistenerCaptor3.capture());
1008     inOrder.verifyNoMoreInteractions();
1009 
1010     // retry3
1011     sublistenerCaptor3.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), new Metadata());
1012     assertEquals(1, fakeClock.numPendingTasks());
1013     fakeClock.forwardTime(
1014         (long) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * BACKOFF_MULTIPLIER * FAKE_RANDOM)
1015             - 1L,
1016         TimeUnit.SECONDS);
1017     assertEquals(1, fakeClock.numPendingTasks());
1018     fakeClock.forwardTime(1L, TimeUnit.SECONDS);
1019     assertEquals(0, fakeClock.numPendingTasks());
1020     verify(retriableStreamRecorder).newSubstream(3);
1021     ArgumentCaptor<ClientStreamListener> sublistenerCaptor4 =
1022         ArgumentCaptor.forClass(ClientStreamListener.class);
1023     inOrder.verify(mockStream4).start(sublistenerCaptor4.capture());
1024     inOrder.verifyNoMoreInteractions();
1025 
1026     // retry4
1027     sublistenerCaptor4.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_2), new Metadata());
1028     assertEquals(1, fakeClock.numPendingTasks());
1029     fakeClock.forwardTime((long) (MAX_BACKOFF_IN_SECONDS * FAKE_RANDOM) - 1L, TimeUnit.SECONDS);
1030     assertEquals(1, fakeClock.numPendingTasks());
1031     fakeClock.forwardTime(1L, TimeUnit.SECONDS);
1032     assertEquals(0, fakeClock.numPendingTasks());
1033     verify(retriableStreamRecorder).newSubstream(4);
1034     ArgumentCaptor<ClientStreamListener> sublistenerCaptor5 =
1035         ArgumentCaptor.forClass(ClientStreamListener.class);
1036     inOrder.verify(mockStream5).start(sublistenerCaptor5.capture());
1037     inOrder.verifyNoMoreInteractions();
1038 
1039     // retry5
1040     sublistenerCaptor5.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_2), new Metadata());
1041     assertEquals(1, fakeClock.numPendingTasks());
1042     fakeClock.forwardTime((long) (MAX_BACKOFF_IN_SECONDS * FAKE_RANDOM) - 1L, TimeUnit.SECONDS);
1043     assertEquals(1, fakeClock.numPendingTasks());
1044     fakeClock.forwardTime(1L, TimeUnit.SECONDS);
1045     assertEquals(0, fakeClock.numPendingTasks());
1046     verify(retriableStreamRecorder).newSubstream(5);
1047     ArgumentCaptor<ClientStreamListener> sublistenerCaptor6 =
1048         ArgumentCaptor.forClass(ClientStreamListener.class);
1049     inOrder.verify(mockStream6).start(sublistenerCaptor6.capture());
1050     inOrder.verifyNoMoreInteractions();
1051 
1052     // can not retry any more
1053     verify(retriableStreamRecorder, never()).postCommit();
1054     sublistenerCaptor6.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), new Metadata());
1055     verify(retriableStreamRecorder).postCommit();
1056     inOrder.verifyNoMoreInteractions();
1057   }
1058 
1059   @Test
pushback()1060   public void pushback() {
1061     ClientStream mockStream1 = mock(ClientStream.class);
1062     ClientStream mockStream2 = mock(ClientStream.class);
1063     ClientStream mockStream3 = mock(ClientStream.class);
1064     ClientStream mockStream4 = mock(ClientStream.class);
1065     ClientStream mockStream5 = mock(ClientStream.class);
1066     ClientStream mockStream6 = mock(ClientStream.class);
1067     ClientStream mockStream7 = mock(ClientStream.class);
1068     InOrder inOrder = inOrder(
1069         mockStream1, mockStream2, mockStream3, mockStream4, mockStream5, mockStream6, mockStream7);
1070     when(retriableStreamRecorder.newSubstream(anyInt())).thenReturn(
1071         mockStream1, mockStream2, mockStream3, mockStream4, mockStream5, mockStream6, mockStream7);
1072 
1073     retriableStream.start(masterListener);
1074     assertEquals(0, fakeClock.numPendingTasks());
1075     verify(retriableStreamRecorder).newSubstream(0);
1076     ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
1077         ArgumentCaptor.forClass(ClientStreamListener.class);
1078     inOrder.verify(mockStream1).start(sublistenerCaptor1.capture());
1079     inOrder.verifyNoMoreInteractions();
1080 
1081 
1082     // retry1
1083     int pushbackInMillis = 123;
1084     Metadata headers = new Metadata();
1085     headers.put(RetriableStream.GRPC_RETRY_PUSHBACK_MS, "" + pushbackInMillis);
1086     sublistenerCaptor1.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), headers);
1087     assertEquals(1, fakeClock.numPendingTasks());
1088     fakeClock.forwardTime(pushbackInMillis - 1, TimeUnit.MILLISECONDS);
1089     assertEquals(1, fakeClock.numPendingTasks());
1090     fakeClock.forwardTime(1L, TimeUnit.MILLISECONDS);
1091     assertEquals(0, fakeClock.numPendingTasks());
1092     verify(retriableStreamRecorder).newSubstream(1);
1093     ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
1094         ArgumentCaptor.forClass(ClientStreamListener.class);
1095     inOrder.verify(mockStream2).start(sublistenerCaptor2.capture());
1096     inOrder.verifyNoMoreInteractions();
1097 
1098     // retry2
1099     pushbackInMillis = 4567 * 1000;
1100     headers = new Metadata();
1101     headers.put(RetriableStream.GRPC_RETRY_PUSHBACK_MS, "" + pushbackInMillis);
1102     sublistenerCaptor2.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_2), headers);
1103     assertEquals(1, fakeClock.numPendingTasks());
1104     fakeClock.forwardTime(pushbackInMillis - 1, TimeUnit.MILLISECONDS);
1105     assertEquals(1, fakeClock.numPendingTasks());
1106     fakeClock.forwardTime(1L, TimeUnit.MILLISECONDS);
1107     assertEquals(0, fakeClock.numPendingTasks());
1108     verify(retriableStreamRecorder).newSubstream(2);
1109     ArgumentCaptor<ClientStreamListener> sublistenerCaptor3 =
1110         ArgumentCaptor.forClass(ClientStreamListener.class);
1111     inOrder.verify(mockStream3).start(sublistenerCaptor3.capture());
1112     inOrder.verifyNoMoreInteractions();
1113 
1114     // retry3
1115     sublistenerCaptor3.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), new Metadata());
1116     assertEquals(1, fakeClock.numPendingTasks());
1117     fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM) - 1L, TimeUnit.SECONDS);
1118     assertEquals(1, fakeClock.numPendingTasks());
1119     fakeClock.forwardTime(1L, TimeUnit.SECONDS);
1120     assertEquals(0, fakeClock.numPendingTasks());
1121     verify(retriableStreamRecorder).newSubstream(3);
1122     ArgumentCaptor<ClientStreamListener> sublistenerCaptor4 =
1123         ArgumentCaptor.forClass(ClientStreamListener.class);
1124     inOrder.verify(mockStream4).start(sublistenerCaptor4.capture());
1125     inOrder.verifyNoMoreInteractions();
1126 
1127     // retry4
1128     sublistenerCaptor4.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_2), new Metadata());
1129     assertEquals(1, fakeClock.numPendingTasks());
1130     fakeClock.forwardTime(
1131         (long) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * FAKE_RANDOM) - 1L,
1132         TimeUnit.SECONDS);
1133     assertEquals(1, fakeClock.numPendingTasks());
1134     fakeClock.forwardTime(1L, TimeUnit.SECONDS);
1135     assertEquals(0, fakeClock.numPendingTasks());
1136     verify(retriableStreamRecorder).newSubstream(4);
1137     ArgumentCaptor<ClientStreamListener> sublistenerCaptor5 =
1138         ArgumentCaptor.forClass(ClientStreamListener.class);
1139     inOrder.verify(mockStream5).start(sublistenerCaptor5.capture());
1140     inOrder.verifyNoMoreInteractions();
1141 
1142     // retry5
1143     sublistenerCaptor5.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_2), new Metadata());
1144     assertEquals(1, fakeClock.numPendingTasks());
1145     fakeClock.forwardTime(
1146         (long) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * BACKOFF_MULTIPLIER * FAKE_RANDOM)
1147             - 1L,
1148         TimeUnit.SECONDS);
1149     assertEquals(1, fakeClock.numPendingTasks());
1150     fakeClock.forwardTime(1L, TimeUnit.SECONDS);
1151     assertEquals(0, fakeClock.numPendingTasks());
1152     verify(retriableStreamRecorder).newSubstream(5);
1153     ArgumentCaptor<ClientStreamListener> sublistenerCaptor6 =
1154         ArgumentCaptor.forClass(ClientStreamListener.class);
1155     inOrder.verify(mockStream6).start(sublistenerCaptor6.capture());
1156     inOrder.verifyNoMoreInteractions();
1157 
1158     // can not retry any more even pushback is positive
1159     pushbackInMillis = 4567 * 1000;
1160     headers = new Metadata();
1161     headers.put(RetriableStream.GRPC_RETRY_PUSHBACK_MS, "" + pushbackInMillis);
1162     verify(retriableStreamRecorder, never()).postCommit();
1163     sublistenerCaptor6.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), headers);
1164     verify(retriableStreamRecorder).postCommit();
1165     inOrder.verifyNoMoreInteractions();
1166   }
1167 
1168   @Test
pushback_noRetry()1169   public void pushback_noRetry() {
1170     ClientStream mockStream1 = mock(ClientStream.class);
1171     doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(anyInt());
1172 
1173     retriableStream.start(masterListener);
1174     assertEquals(0, fakeClock.numPendingTasks());
1175     verify(retriableStreamRecorder).newSubstream(0);
1176     ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
1177         ArgumentCaptor.forClass(ClientStreamListener.class);
1178     verify(mockStream1).start(sublistenerCaptor1.capture());
1179     verify(retriableStreamRecorder, never()).postCommit();
1180 
1181     // pushback no retry
1182     Metadata headers = new Metadata();
1183     headers.put(RetriableStream.GRPC_RETRY_PUSHBACK_MS, "");
1184     sublistenerCaptor1.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), headers);
1185 
1186     verify(retriableStreamRecorder, never()).newSubstream(1);
1187     verify(retriableStreamRecorder).postCommit();
1188   }
1189 
1190   @Test
throttle()1191   public void throttle() {
1192     Throttle throttle = new Throttle(4f, 0.8f);
1193     assertTrue(throttle.isAboveThreshold());
1194     assertTrue(throttle.onQualifiedFailureThenCheckIsAboveThreshold()); // token = 3
1195     assertTrue(throttle.isAboveThreshold());
1196     assertFalse(throttle.onQualifiedFailureThenCheckIsAboveThreshold()); // token = 2
1197     assertFalse(throttle.isAboveThreshold());
1198     assertFalse(throttle.onQualifiedFailureThenCheckIsAboveThreshold()); // token = 1
1199     assertFalse(throttle.onQualifiedFailureThenCheckIsAboveThreshold()); // token = 0
1200     assertFalse(throttle.onQualifiedFailureThenCheckIsAboveThreshold()); // token = 0
1201     assertFalse(throttle.onQualifiedFailureThenCheckIsAboveThreshold()); // token = 0
1202     assertFalse(throttle.isAboveThreshold());
1203 
1204     throttle.onSuccess(); // token = 0.8
1205     assertFalse(throttle.isAboveThreshold());
1206     throttle.onSuccess(); // token = 1.6
1207     assertFalse(throttle.isAboveThreshold());
1208     throttle.onSuccess(); // token = 3.2
1209     assertTrue(throttle.isAboveThreshold());
1210     throttle.onSuccess(); // token = 4
1211     assertTrue(throttle.isAboveThreshold());
1212     throttle.onSuccess(); // token = 4
1213     assertTrue(throttle.isAboveThreshold());
1214     throttle.onSuccess(); // token = 4
1215     assertTrue(throttle.isAboveThreshold());
1216 
1217     assertTrue(throttle.isAboveThreshold());
1218     assertTrue(throttle.onQualifiedFailureThenCheckIsAboveThreshold()); // token = 3
1219     assertTrue(throttle.isAboveThreshold());
1220     assertFalse(throttle.onQualifiedFailureThenCheckIsAboveThreshold()); // token = 2
1221     assertFalse(throttle.isAboveThreshold());
1222   }
1223 
1224   @Test
throttledStream_FailWithRetriableStatusCode_WithoutPushback()1225   public void throttledStream_FailWithRetriableStatusCode_WithoutPushback() {
1226     Throttle throttle = new Throttle(4f, 0.8f);
1227     RetriableStream<String> retriableStream = newThrottledRetriableStream(throttle);
1228 
1229     ClientStream mockStream = mock(ClientStream.class);
1230     doReturn(mockStream).when(retriableStreamRecorder).newSubstream(anyInt());
1231     retriableStream.start(masterListener);
1232     ArgumentCaptor<ClientStreamListener> sublistenerCaptor =
1233         ArgumentCaptor.forClass(ClientStreamListener.class);
1234     verify(mockStream).start(sublistenerCaptor.capture());
1235 
1236     // mimic some other call in the channel triggers a throttle countdown
1237     assertTrue(throttle.onQualifiedFailureThenCheckIsAboveThreshold()); // count = 3
1238 
1239     sublistenerCaptor.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), new Metadata());
1240     verify(retriableStreamRecorder).postCommit();
1241     assertFalse(throttle.isAboveThreshold()); // count = 2
1242   }
1243 
1244   @Test
throttledStream_FailWithNonRetriableStatusCode_WithoutPushback()1245   public void throttledStream_FailWithNonRetriableStatusCode_WithoutPushback() {
1246     Throttle throttle = new Throttle(4f, 0.8f);
1247     RetriableStream<String> retriableStream = newThrottledRetriableStream(throttle);
1248 
1249     ClientStream mockStream = mock(ClientStream.class);
1250     doReturn(mockStream).when(retriableStreamRecorder).newSubstream(anyInt());
1251     retriableStream.start(masterListener);
1252     ArgumentCaptor<ClientStreamListener> sublistenerCaptor =
1253         ArgumentCaptor.forClass(ClientStreamListener.class);
1254     verify(mockStream).start(sublistenerCaptor.capture());
1255 
1256     // mimic some other call in the channel triggers a throttle countdown
1257     assertTrue(throttle.onQualifiedFailureThenCheckIsAboveThreshold()); // count = 3
1258 
1259     sublistenerCaptor.getValue().closed(Status.fromCode(NON_RETRIABLE_STATUS_CODE), new Metadata());
1260     verify(retriableStreamRecorder).postCommit();
1261     assertTrue(throttle.isAboveThreshold()); // count = 3
1262 
1263     assertFalse(throttle.onQualifiedFailureThenCheckIsAboveThreshold()); // count = 2
1264   }
1265 
1266   @Test
throttledStream_FailWithRetriableStatusCode_WithRetriablePushback()1267   public void throttledStream_FailWithRetriableStatusCode_WithRetriablePushback() {
1268     Throttle throttle = new Throttle(4f, 0.8f);
1269     RetriableStream<String> retriableStream = newThrottledRetriableStream(throttle);
1270 
1271     ClientStream mockStream = mock(ClientStream.class);
1272     doReturn(mockStream).when(retriableStreamRecorder).newSubstream(anyInt());
1273     retriableStream.start(masterListener);
1274     ArgumentCaptor<ClientStreamListener> sublistenerCaptor =
1275         ArgumentCaptor.forClass(ClientStreamListener.class);
1276     verify(mockStream).start(sublistenerCaptor.capture());
1277 
1278     // mimic some other call in the channel triggers a throttle countdown
1279     assertTrue(throttle.onQualifiedFailureThenCheckIsAboveThreshold()); // count = 3
1280 
1281     int pushbackInMillis = 123;
1282     Metadata headers = new Metadata();
1283     headers.put(RetriableStream.GRPC_RETRY_PUSHBACK_MS, "" + pushbackInMillis);
1284     sublistenerCaptor.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), headers);
1285     verify(retriableStreamRecorder).postCommit();
1286     assertFalse(throttle.isAboveThreshold()); // count = 2
1287   }
1288 
1289   @Test
throttledStream_FailWithNonRetriableStatusCode_WithRetriablePushback()1290   public void throttledStream_FailWithNonRetriableStatusCode_WithRetriablePushback() {
1291     Throttle throttle = new Throttle(4f, 0.8f);
1292     RetriableStream<String> retriableStream = newThrottledRetriableStream(throttle);
1293 
1294     ClientStream mockStream = mock(ClientStream.class);
1295     doReturn(mockStream).when(retriableStreamRecorder).newSubstream(anyInt());
1296     retriableStream.start(masterListener);
1297     ArgumentCaptor<ClientStreamListener> sublistenerCaptor =
1298         ArgumentCaptor.forClass(ClientStreamListener.class);
1299     verify(mockStream).start(sublistenerCaptor.capture());
1300 
1301     // mimic some other call in the channel triggers a throttle countdown
1302     assertTrue(throttle.onQualifiedFailureThenCheckIsAboveThreshold()); // count = 3
1303 
1304     int pushbackInMillis = 123;
1305     Metadata headers = new Metadata();
1306     headers.put(RetriableStream.GRPC_RETRY_PUSHBACK_MS, "" + pushbackInMillis);
1307     sublistenerCaptor.getValue().closed(Status.fromCode(NON_RETRIABLE_STATUS_CODE), headers);
1308     verify(retriableStreamRecorder, never()).postCommit();
1309     assertTrue(throttle.isAboveThreshold()); // count = 3
1310 
1311     // drain pending retry
1312     fakeClock.forwardTime(pushbackInMillis, TimeUnit.MILLISECONDS);
1313 
1314     assertTrue(throttle.isAboveThreshold()); // count = 3
1315     assertFalse(throttle.onQualifiedFailureThenCheckIsAboveThreshold()); // count = 2
1316   }
1317 
1318   @Test
throttledStream_FailWithRetriableStatusCode_WithNonRetriablePushback()1319   public void throttledStream_FailWithRetriableStatusCode_WithNonRetriablePushback() {
1320     Throttle throttle = new Throttle(4f, 0.8f);
1321     RetriableStream<String> retriableStream = newThrottledRetriableStream(throttle);
1322 
1323     ClientStream mockStream = mock(ClientStream.class);
1324     doReturn(mockStream).when(retriableStreamRecorder).newSubstream(anyInt());
1325     retriableStream.start(masterListener);
1326     ArgumentCaptor<ClientStreamListener> sublistenerCaptor =
1327         ArgumentCaptor.forClass(ClientStreamListener.class);
1328     verify(mockStream).start(sublistenerCaptor.capture());
1329 
1330     // mimic some other call in the channel triggers a throttle countdown
1331     assertTrue(throttle.onQualifiedFailureThenCheckIsAboveThreshold()); // count = 3
1332 
1333     Metadata headers = new Metadata();
1334     headers.put(RetriableStream.GRPC_RETRY_PUSHBACK_MS, "");
1335     sublistenerCaptor.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), headers);
1336     verify(retriableStreamRecorder).postCommit();
1337     assertFalse(throttle.isAboveThreshold()); // count = 2
1338   }
1339 
1340   @Test
throttledStream_FailWithNonRetriableStatusCode_WithNonRetriablePushback()1341   public void throttledStream_FailWithNonRetriableStatusCode_WithNonRetriablePushback() {
1342     Throttle throttle = new Throttle(4f, 0.8f);
1343     RetriableStream<String> retriableStream = newThrottledRetriableStream(throttle);
1344 
1345     ClientStream mockStream = mock(ClientStream.class);
1346     doReturn(mockStream).when(retriableStreamRecorder).newSubstream(anyInt());
1347     retriableStream.start(masterListener);
1348     ArgumentCaptor<ClientStreamListener> sublistenerCaptor =
1349         ArgumentCaptor.forClass(ClientStreamListener.class);
1350     verify(mockStream).start(sublistenerCaptor.capture());
1351 
1352     // mimic some other call in the channel triggers a throttle countdown
1353     assertTrue(throttle.onQualifiedFailureThenCheckIsAboveThreshold()); // count = 3
1354 
1355     Metadata headers = new Metadata();
1356     headers.put(RetriableStream.GRPC_RETRY_PUSHBACK_MS, "");
1357     sublistenerCaptor.getValue().closed(Status.fromCode(NON_RETRIABLE_STATUS_CODE), headers);
1358     verify(retriableStreamRecorder).postCommit();
1359     assertFalse(throttle.isAboveThreshold()); // count = 2
1360   }
1361 
1362   @Test
throttleStream_Succeed()1363   public void throttleStream_Succeed() {
1364     Throttle throttle = new Throttle(4f, 0.8f);
1365     RetriableStream<String> retriableStream = newThrottledRetriableStream(throttle);
1366 
1367     ClientStream mockStream = mock(ClientStream.class);
1368     doReturn(mockStream).when(retriableStreamRecorder).newSubstream(anyInt());
1369     retriableStream.start(masterListener);
1370     ArgumentCaptor<ClientStreamListener> sublistenerCaptor =
1371         ArgumentCaptor.forClass(ClientStreamListener.class);
1372     verify(mockStream).start(sublistenerCaptor.capture());
1373 
1374     // mimic some other calls in the channel trigger throttle countdowns
1375     assertTrue(throttle.onQualifiedFailureThenCheckIsAboveThreshold()); // count = 3
1376     assertFalse(throttle.onQualifiedFailureThenCheckIsAboveThreshold()); // count = 2
1377     assertFalse(throttle.onQualifiedFailureThenCheckIsAboveThreshold()); // count = 1
1378 
1379     sublistenerCaptor.getValue().headersRead(new Metadata());
1380     verify(retriableStreamRecorder).postCommit();
1381     assertFalse(throttle.isAboveThreshold());  // count = 1.8
1382 
1383     // mimic some other call in the channel triggers a success
1384     throttle.onSuccess();
1385     assertTrue(throttle.isAboveThreshold()); // count = 2.6
1386   }
1387 
1388   @Test
transparentRetry()1389   public void transparentRetry() {
1390     ClientStream mockStream1 = mock(ClientStream.class);
1391     ClientStream mockStream2 = mock(ClientStream.class);
1392     ClientStream mockStream3 = mock(ClientStream.class);
1393     InOrder inOrder = inOrder(
1394         retriableStreamRecorder,
1395         mockStream1, mockStream2, mockStream3);
1396 
1397     // start
1398     doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0);
1399     retriableStream.start(masterListener);
1400 
1401     inOrder.verify(retriableStreamRecorder).newSubstream(0);
1402     ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
1403         ArgumentCaptor.forClass(ClientStreamListener.class);
1404     inOrder.verify(mockStream1).start(sublistenerCaptor1.capture());
1405     inOrder.verifyNoMoreInteractions();
1406 
1407     // transparent retry
1408     doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(0);
1409     sublistenerCaptor1.getValue()
1410         .closed(Status.fromCode(NON_RETRIABLE_STATUS_CODE), REFUSED, new Metadata());
1411 
1412     inOrder.verify(retriableStreamRecorder).newSubstream(0);
1413     ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
1414         ArgumentCaptor.forClass(ClientStreamListener.class);
1415     inOrder.verify(mockStream2).start(sublistenerCaptor2.capture());
1416     inOrder.verifyNoMoreInteractions();
1417     verify(retriableStreamRecorder, never()).postCommit();
1418     assertEquals(0, fakeClock.numPendingTasks());
1419 
1420     // no more transparent retry
1421     doReturn(mockStream3).when(retriableStreamRecorder).newSubstream(1);
1422     sublistenerCaptor2.getValue()
1423         .closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), REFUSED, new Metadata());
1424 
1425     assertEquals(1, fakeClock.numPendingTasks());
1426     fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);
1427     inOrder.verify(retriableStreamRecorder).newSubstream(1);
1428     ArgumentCaptor<ClientStreamListener> sublistenerCaptor3 =
1429         ArgumentCaptor.forClass(ClientStreamListener.class);
1430     inOrder.verify(mockStream3).start(sublistenerCaptor3.capture());
1431     inOrder.verifyNoMoreInteractions();
1432     verify(retriableStreamRecorder, never()).postCommit();
1433     assertEquals(0, fakeClock.numPendingTasks());
1434   }
1435 
1436   @Test
normalRetry_thenNoTransparentRetry_butNormalRetry()1437   public void normalRetry_thenNoTransparentRetry_butNormalRetry() {
1438     ClientStream mockStream1 = mock(ClientStream.class);
1439     ClientStream mockStream2 = mock(ClientStream.class);
1440     ClientStream mockStream3 = mock(ClientStream.class);
1441     InOrder inOrder = inOrder(
1442         retriableStreamRecorder,
1443         mockStream1, mockStream2, mockStream3);
1444 
1445     // start
1446     doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0);
1447     retriableStream.start(masterListener);
1448 
1449     inOrder.verify(retriableStreamRecorder).newSubstream(0);
1450     ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
1451         ArgumentCaptor.forClass(ClientStreamListener.class);
1452     inOrder.verify(mockStream1).start(sublistenerCaptor1.capture());
1453     inOrder.verifyNoMoreInteractions();
1454 
1455     // normal retry
1456     doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1);
1457     sublistenerCaptor1.getValue()
1458         .closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata());
1459 
1460     assertEquals(1, fakeClock.numPendingTasks());
1461     fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);
1462     inOrder.verify(retriableStreamRecorder).newSubstream(1);
1463     ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
1464         ArgumentCaptor.forClass(ClientStreamListener.class);
1465     inOrder.verify(mockStream2).start(sublistenerCaptor2.capture());
1466     inOrder.verifyNoMoreInteractions();
1467     verify(retriableStreamRecorder, never()).postCommit();
1468     assertEquals(0, fakeClock.numPendingTasks());
1469 
1470     // no more transparent retry
1471     doReturn(mockStream3).when(retriableStreamRecorder).newSubstream(2);
1472     sublistenerCaptor2.getValue()
1473         .closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), REFUSED, new Metadata());
1474 
1475     assertEquals(1, fakeClock.numPendingTasks());
1476     fakeClock.forwardTime(
1477         (long) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * FAKE_RANDOM), TimeUnit.SECONDS);
1478     inOrder.verify(retriableStreamRecorder).newSubstream(2);
1479     ArgumentCaptor<ClientStreamListener> sublistenerCaptor3 =
1480         ArgumentCaptor.forClass(ClientStreamListener.class);
1481     inOrder.verify(mockStream3).start(sublistenerCaptor3.capture());
1482     inOrder.verifyNoMoreInteractions();
1483     verify(retriableStreamRecorder, never()).postCommit();
1484   }
1485 
1486   @Test
normalRetry_thenNoTransparentRetry_andNoMoreRetry()1487   public void normalRetry_thenNoTransparentRetry_andNoMoreRetry() {
1488     ClientStream mockStream1 = mock(ClientStream.class);
1489     ClientStream mockStream2 = mock(ClientStream.class);
1490     ClientStream mockStream3 = mock(ClientStream.class);
1491     InOrder inOrder = inOrder(
1492         retriableStreamRecorder,
1493         mockStream1, mockStream2, mockStream3);
1494 
1495     // start
1496     doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0);
1497     retriableStream.start(masterListener);
1498 
1499     inOrder.verify(retriableStreamRecorder).newSubstream(0);
1500     ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
1501         ArgumentCaptor.forClass(ClientStreamListener.class);
1502     inOrder.verify(mockStream1).start(sublistenerCaptor1.capture());
1503     inOrder.verifyNoMoreInteractions();
1504 
1505     // normal retry
1506     doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1);
1507     sublistenerCaptor1.getValue()
1508         .closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata());
1509 
1510     assertEquals(1, fakeClock.numPendingTasks());
1511     fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);
1512     inOrder.verify(retriableStreamRecorder).newSubstream(1);
1513     ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
1514         ArgumentCaptor.forClass(ClientStreamListener.class);
1515     inOrder.verify(mockStream2).start(sublistenerCaptor2.capture());
1516     inOrder.verifyNoMoreInteractions();
1517     verify(retriableStreamRecorder, never()).postCommit();
1518     assertEquals(0, fakeClock.numPendingTasks());
1519 
1520     // no more transparent retry
1521     doReturn(mockStream3).when(retriableStreamRecorder).newSubstream(2);
1522     sublistenerCaptor2.getValue()
1523         .closed(Status.fromCode(NON_RETRIABLE_STATUS_CODE), REFUSED, new Metadata());
1524 
1525     verify(retriableStreamRecorder).postCommit();
1526   }
1527 
1528   @Test
droppedShouldNeverRetry()1529   public void droppedShouldNeverRetry() {
1530     ClientStream mockStream1 = mock(ClientStream.class);
1531     ClientStream mockStream2 = mock(ClientStream.class);
1532     doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0);
1533     doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1);
1534 
1535     // start
1536     retriableStream.start(masterListener);
1537 
1538     verify(retriableStreamRecorder).newSubstream(0);
1539     ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
1540         ArgumentCaptor.forClass(ClientStreamListener.class);
1541     verify(mockStream1).start(sublistenerCaptor1.capture());
1542 
1543     // drop and verify no retry
1544     Status status = Status.fromCode(RETRIABLE_STATUS_CODE_1);
1545     sublistenerCaptor1.getValue().closed(status, DROPPED, new Metadata());
1546 
1547     verifyNoMoreInteractions(mockStream1, mockStream2);
1548     verify(retriableStreamRecorder).postCommit();
1549     verify(masterListener).closed(same(status), any(Metadata.class));
1550   }
1551 
1552   /**
1553    * Used to stub a retriable stream as well as to record methods of the retriable stream being
1554    * called.
1555    */
1556   private interface RetriableStreamRecorder {
postCommit()1557     void postCommit();
1558 
newSubstream(int previousAttempts)1559     ClientStream newSubstream(int previousAttempts);
1560 
prestart()1561     Status prestart();
1562   }
1563 }
1564