1 /* 2 * Copyright 2017 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.internal; 18 19 import static com.google.common.truth.Truth.assertThat; 20 import static io.grpc.internal.ClientStreamListener.RpcProgress.DROPPED; 21 import static io.grpc.internal.ClientStreamListener.RpcProgress.PROCESSED; 22 import static io.grpc.internal.ClientStreamListener.RpcProgress.REFUSED; 23 import static io.grpc.internal.RetriableStream.GRPC_PREVIOUS_RPC_ATTEMPTS; 24 import static org.junit.Assert.assertEquals; 25 import static org.junit.Assert.assertFalse; 26 import static org.junit.Assert.assertNotSame; 27 import static org.junit.Assert.assertNull; 28 import static org.junit.Assert.assertTrue; 29 import static org.mockito.AdditionalAnswers.delegatesTo; 30 import static org.mockito.Matchers.any; 31 import static org.mockito.Matchers.anyInt; 32 import static org.mockito.Matchers.same; 33 import static org.mockito.Mockito.doReturn; 34 import static org.mockito.Mockito.inOrder; 35 import static org.mockito.Mockito.mock; 36 import static org.mockito.Mockito.never; 37 import static org.mockito.Mockito.times; 38 import static org.mockito.Mockito.verify; 39 import static org.mockito.Mockito.verifyNoMoreInteractions; 40 import static org.mockito.Mockito.when; 41 42 import com.google.common.collect.ImmutableSet; 43 import com.google.common.util.concurrent.MoreExecutors; 44 import io.grpc.CallOptions; 45 import io.grpc.ClientStreamTracer; 46 import io.grpc.Codec; 47 import io.grpc.Compressor; 48 import io.grpc.DecompressorRegistry; 49 import io.grpc.Metadata; 50 import io.grpc.MethodDescriptor; 51 import io.grpc.MethodDescriptor.MethodType; 52 import io.grpc.Status; 53 import io.grpc.Status.Code; 54 import io.grpc.StringMarshaller; 55 import io.grpc.internal.RetriableStream.ChannelBufferMeter; 56 import io.grpc.internal.RetriableStream.Throttle; 57 import io.grpc.internal.StreamListener.MessageProducer; 58 import java.io.InputStream; 59 import java.util.ArrayList; 60 import java.util.List; 61 import java.util.Random; 62 import java.util.concurrent.Executor; 63 import java.util.concurrent.ScheduledExecutorService; 64 import java.util.concurrent.TimeUnit; 65 import java.util.concurrent.atomic.AtomicReference; 66 import javax.annotation.Nullable; 67 import org.junit.After; 68 import org.junit.Test; 69 import org.junit.runner.RunWith; 70 import org.junit.runners.JUnit4; 71 import org.mockito.ArgumentCaptor; 72 import org.mockito.InOrder; 73 74 /** Unit tests for {@link RetriableStream}. */ 75 @RunWith(JUnit4.class) 76 public class RetriableStreamTest { 77 private static final String CANCELLED_BECAUSE_COMMITTED = 78 "Stream thrown away because RetriableStream committed"; 79 private static final String AUTHORITY = "fakeAuthority"; 80 private static final Compressor COMPRESSOR = Codec.Identity.NONE; 81 private static final DecompressorRegistry DECOMPRESSOR_REGISTRY = 82 DecompressorRegistry.getDefaultInstance(); 83 private static final int MAX_INBOUND_MESSAGE_SIZE = 1234; 84 private static final int MAX_OUTBOUND_MESSAGE_SIZE = 5678; 85 private static final long PER_RPC_BUFFER_LIMIT = 1000; 86 private static final long CHANNEL_BUFFER_LIMIT = 2000; 87 private static final int MAX_ATTEMPTS = 6; 88 private static final long INITIAL_BACKOFF_IN_SECONDS = 100; 89 private static final long MAX_BACKOFF_IN_SECONDS = 700; 90 private static final double BACKOFF_MULTIPLIER = 2D; 91 private static final double FAKE_RANDOM = .5D; 92 93 static { RetriableStream.setRandom( new Random() { @Override public double nextDouble() { return FAKE_RANDOM; } })94 RetriableStream.setRandom( 95 // not random 96 new Random() { 97 @Override 98 public double nextDouble() { 99 return FAKE_RANDOM; 100 } 101 }); 102 } 103 104 private static final Code RETRIABLE_STATUS_CODE_1 = Code.UNAVAILABLE; 105 private static final Code RETRIABLE_STATUS_CODE_2 = Code.DATA_LOSS; 106 private static final Code NON_RETRIABLE_STATUS_CODE = Code.INTERNAL; 107 private static final RetryPolicy RETRY_POLICY = 108 new RetryPolicy( 109 MAX_ATTEMPTS, 110 TimeUnit.SECONDS.toNanos(INITIAL_BACKOFF_IN_SECONDS), 111 TimeUnit.SECONDS.toNanos(MAX_BACKOFF_IN_SECONDS), 112 BACKOFF_MULTIPLIER, 113 ImmutableSet.of(RETRIABLE_STATUS_CODE_1, RETRIABLE_STATUS_CODE_2)); 114 115 private final RetriableStreamRecorder retriableStreamRecorder = 116 mock(RetriableStreamRecorder.class); 117 private final ClientStreamListener masterListener = mock(ClientStreamListener.class); 118 private final MethodDescriptor<String, String> method = 119 MethodDescriptor.<String, String>newBuilder() 120 .setType(MethodType.BIDI_STREAMING) 121 .setFullMethodName(MethodDescriptor.generateFullMethodName("service_foo", "method_bar")) 122 .setRequestMarshaller(new StringMarshaller()) 123 .setResponseMarshaller(new StringMarshaller()) 124 .build(); 125 private final ChannelBufferMeter channelBufferUsed = new ChannelBufferMeter(); 126 private final FakeClock fakeClock = new FakeClock(); 127 128 private final class RecordedRetriableStream extends RetriableStream<String> { RecordedRetriableStream(MethodDescriptor<String, ?> method, Metadata headers, ChannelBufferMeter channelBufferUsed, long perRpcBufferLimit, long channelBufferLimit, Executor callExecutor, ScheduledExecutorService scheduledExecutorService, final RetryPolicy retryPolicy, final HedgingPolicy hedgingPolicy, @Nullable Throttle throttle)129 RecordedRetriableStream(MethodDescriptor<String, ?> method, Metadata headers, 130 ChannelBufferMeter channelBufferUsed, long perRpcBufferLimit, long channelBufferLimit, 131 Executor callExecutor, 132 ScheduledExecutorService scheduledExecutorService, 133 final RetryPolicy retryPolicy, 134 final HedgingPolicy hedgingPolicy, 135 @Nullable Throttle throttle) { 136 super( 137 method, headers, channelBufferUsed, perRpcBufferLimit, channelBufferLimit, callExecutor, 138 scheduledExecutorService, 139 new RetryPolicy.Provider() { 140 @Override 141 public RetryPolicy get() { 142 return retryPolicy; 143 } 144 }, 145 new HedgingPolicy.Provider() { 146 @Override 147 public HedgingPolicy get() { 148 return hedgingPolicy; 149 } 150 }, 151 throttle); 152 } 153 154 @Override postCommit()155 void postCommit() { 156 retriableStreamRecorder.postCommit(); 157 } 158 159 @Override newSubstream(ClientStreamTracer.Factory tracerFactory, Metadata metadata)160 ClientStream newSubstream(ClientStreamTracer.Factory tracerFactory, Metadata metadata) { 161 bufferSizeTracer = 162 tracerFactory.newClientStreamTracer(CallOptions.DEFAULT, new Metadata()); 163 int actualPreviousRpcAttemptsInHeader = metadata.get(GRPC_PREVIOUS_RPC_ATTEMPTS) == null 164 ? 0 : Integer.valueOf(metadata.get(GRPC_PREVIOUS_RPC_ATTEMPTS)); 165 return retriableStreamRecorder.newSubstream(actualPreviousRpcAttemptsInHeader); 166 } 167 168 @Override prestart()169 Status prestart() { 170 return retriableStreamRecorder.prestart(); 171 } 172 } 173 174 private final RetriableStream<String> retriableStream = 175 newThrottledRetriableStream(null /* throttle */); 176 177 private ClientStreamTracer bufferSizeTracer; 178 newThrottledRetriableStream(Throttle throttle)179 private RetriableStream<String> newThrottledRetriableStream(Throttle throttle) { 180 return new RecordedRetriableStream( 181 method, new Metadata(), channelBufferUsed, PER_RPC_BUFFER_LIMIT, CHANNEL_BUFFER_LIMIT, 182 MoreExecutors.directExecutor(), fakeClock.getScheduledExecutorService(), RETRY_POLICY, 183 HedgingPolicy.DEFAULT, throttle); 184 } 185 186 @After tearDown()187 public void tearDown() { 188 assertEquals(0, fakeClock.numPendingTasks()); 189 } 190 191 @Test retry_everythingDrained()192 public void retry_everythingDrained() { 193 ClientStream mockStream1 = mock(ClientStream.class); 194 ClientStream mockStream2 = mock(ClientStream.class); 195 ClientStream mockStream3 = mock(ClientStream.class); 196 doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0); 197 InOrder inOrder = 198 inOrder(retriableStreamRecorder, masterListener, mockStream1, mockStream2, mockStream3); 199 200 // stream settings before start 201 retriableStream.setAuthority(AUTHORITY); 202 retriableStream.setCompressor(COMPRESSOR); 203 retriableStream.setDecompressorRegistry(DECOMPRESSOR_REGISTRY); 204 retriableStream.setFullStreamDecompression(false); 205 retriableStream.setFullStreamDecompression(true); 206 retriableStream.setMaxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE); 207 retriableStream.setMessageCompression(true); 208 retriableStream.setMaxOutboundMessageSize(MAX_OUTBOUND_MESSAGE_SIZE); 209 retriableStream.setMessageCompression(false); 210 211 inOrder.verifyNoMoreInteractions(); 212 213 // start 214 retriableStream.start(masterListener); 215 216 inOrder.verify(retriableStreamRecorder).prestart(); 217 inOrder.verify(retriableStreamRecorder).newSubstream(0); 218 219 inOrder.verify(mockStream1).setAuthority(AUTHORITY); 220 inOrder.verify(mockStream1).setCompressor(COMPRESSOR); 221 inOrder.verify(mockStream1).setDecompressorRegistry(DECOMPRESSOR_REGISTRY); 222 inOrder.verify(mockStream1).setFullStreamDecompression(false); 223 inOrder.verify(mockStream1).setFullStreamDecompression(true); 224 inOrder.verify(mockStream1).setMaxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE); 225 inOrder.verify(mockStream1).setMessageCompression(true); 226 inOrder.verify(mockStream1).setMaxOutboundMessageSize(MAX_OUTBOUND_MESSAGE_SIZE); 227 inOrder.verify(mockStream1).setMessageCompression(false); 228 229 ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 = 230 ArgumentCaptor.forClass(ClientStreamListener.class); 231 inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); 232 inOrder.verifyNoMoreInteractions(); 233 234 retriableStream.sendMessage("msg1"); 235 retriableStream.sendMessage("msg2"); 236 retriableStream.request(345); 237 retriableStream.flush(); 238 retriableStream.flush(); 239 retriableStream.sendMessage("msg3"); 240 retriableStream.request(456); 241 242 inOrder.verify(mockStream1, times(2)).writeMessage(any(InputStream.class)); 243 inOrder.verify(mockStream1).request(345); 244 inOrder.verify(mockStream1, times(2)).flush(); 245 inOrder.verify(mockStream1).writeMessage(any(InputStream.class)); 246 inOrder.verify(mockStream1).request(456); 247 inOrder.verifyNoMoreInteractions(); 248 249 // retry1 250 doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1); 251 sublistenerCaptor1.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_2), new Metadata()); 252 assertEquals(1, fakeClock.numPendingTasks()); 253 254 // send more messages during backoff 255 retriableStream.sendMessage("msg1 during backoff1"); 256 retriableStream.sendMessage("msg2 during backoff1"); 257 258 fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM) - 1L, TimeUnit.SECONDS); 259 inOrder.verifyNoMoreInteractions(); 260 assertEquals(1, fakeClock.numPendingTasks()); 261 fakeClock.forwardTime(1L, TimeUnit.SECONDS); 262 assertEquals(0, fakeClock.numPendingTasks()); 263 inOrder.verify(retriableStreamRecorder).newSubstream(1); 264 inOrder.verify(mockStream2).setAuthority(AUTHORITY); 265 inOrder.verify(mockStream2).setCompressor(COMPRESSOR); 266 inOrder.verify(mockStream2).setDecompressorRegistry(DECOMPRESSOR_REGISTRY); 267 inOrder.verify(mockStream2).setFullStreamDecompression(false); 268 inOrder.verify(mockStream2).setFullStreamDecompression(true); 269 inOrder.verify(mockStream2).setMaxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE); 270 inOrder.verify(mockStream2).setMessageCompression(true); 271 inOrder.verify(mockStream2).setMaxOutboundMessageSize(MAX_OUTBOUND_MESSAGE_SIZE); 272 inOrder.verify(mockStream2).setMessageCompression(false); 273 274 ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 = 275 ArgumentCaptor.forClass(ClientStreamListener.class); 276 inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); 277 inOrder.verify(mockStream2, times(2)).writeMessage(any(InputStream.class)); 278 inOrder.verify(mockStream2).request(345); 279 inOrder.verify(mockStream2, times(2)).flush(); 280 inOrder.verify(mockStream2).writeMessage(any(InputStream.class)); 281 inOrder.verify(mockStream2).request(456); 282 inOrder.verify(mockStream2, times(2)).writeMessage(any(InputStream.class)); 283 inOrder.verifyNoMoreInteractions(); 284 285 // send more messages 286 retriableStream.sendMessage("msg1 after retry1"); 287 retriableStream.sendMessage("msg2 after retry1"); 288 289 // mockStream1 is closed so it is not in the drainedSubstreams 290 verifyNoMoreInteractions(mockStream1); 291 inOrder.verify(mockStream2, times(2)).writeMessage(any(InputStream.class)); 292 293 // retry2 294 doReturn(mockStream3).when(retriableStreamRecorder).newSubstream(2); 295 sublistenerCaptor2.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), new Metadata()); 296 assertEquals(1, fakeClock.numPendingTasks()); 297 298 // send more messages during backoff 299 retriableStream.sendMessage("msg1 during backoff2"); 300 retriableStream.sendMessage("msg2 during backoff2"); 301 retriableStream.sendMessage("msg3 during backoff2"); 302 303 fakeClock.forwardTime( 304 (long) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * FAKE_RANDOM) - 1L, 305 TimeUnit.SECONDS); 306 inOrder.verifyNoMoreInteractions(); 307 assertEquals(1, fakeClock.numPendingTasks()); 308 fakeClock.forwardTime(1L, TimeUnit.SECONDS); 309 assertEquals(0, fakeClock.numPendingTasks()); 310 inOrder.verify(retriableStreamRecorder).newSubstream(2); 311 inOrder.verify(mockStream3).setAuthority(AUTHORITY); 312 inOrder.verify(mockStream3).setCompressor(COMPRESSOR); 313 inOrder.verify(mockStream3).setDecompressorRegistry(DECOMPRESSOR_REGISTRY); 314 inOrder.verify(mockStream3).setFullStreamDecompression(false); 315 inOrder.verify(mockStream3).setFullStreamDecompression(true); 316 inOrder.verify(mockStream3).setMaxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE); 317 inOrder.verify(mockStream3).setMessageCompression(true); 318 inOrder.verify(mockStream3).setMaxOutboundMessageSize(MAX_OUTBOUND_MESSAGE_SIZE); 319 inOrder.verify(mockStream3).setMessageCompression(false); 320 321 ArgumentCaptor<ClientStreamListener> sublistenerCaptor3 = 322 ArgumentCaptor.forClass(ClientStreamListener.class); 323 inOrder.verify(mockStream3).start(sublistenerCaptor3.capture()); 324 inOrder.verify(mockStream3, times(2)).writeMessage(any(InputStream.class)); 325 inOrder.verify(mockStream3).request(345); 326 inOrder.verify(mockStream3, times(2)).flush(); 327 inOrder.verify(mockStream3).writeMessage(any(InputStream.class)); 328 inOrder.verify(mockStream3).request(456); 329 inOrder.verify(mockStream3, times(7)).writeMessage(any(InputStream.class)); 330 inOrder.verifyNoMoreInteractions(); 331 332 // no more retry 333 sublistenerCaptor3.getValue().closed( 334 Status.fromCode(NON_RETRIABLE_STATUS_CODE), new Metadata()); 335 336 inOrder.verify(retriableStreamRecorder).postCommit(); 337 inOrder.verify(masterListener).closed(any(Status.class), any(Metadata.class)); 338 inOrder.verifyNoMoreInteractions(); 339 } 340 341 @Test headersRead_cancel()342 public void headersRead_cancel() { 343 ClientStream mockStream1 = mock(ClientStream.class); 344 doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0); 345 InOrder inOrder = inOrder(retriableStreamRecorder); 346 347 retriableStream.start(masterListener); 348 349 ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 = 350 ArgumentCaptor.forClass(ClientStreamListener.class); 351 verify(mockStream1).start(sublistenerCaptor1.capture()); 352 353 sublistenerCaptor1.getValue().headersRead(new Metadata()); 354 355 inOrder.verify(retriableStreamRecorder).postCommit(); 356 357 retriableStream.cancel(Status.CANCELLED); 358 359 inOrder.verify(retriableStreamRecorder, never()).postCommit(); 360 } 361 362 @Test retry_headersRead_cancel()363 public void retry_headersRead_cancel() { 364 ClientStream mockStream1 = mock(ClientStream.class); 365 doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0); 366 InOrder inOrder = inOrder(retriableStreamRecorder); 367 368 retriableStream.start(masterListener); 369 370 ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 = 371 ArgumentCaptor.forClass(ClientStreamListener.class); 372 verify(mockStream1).start(sublistenerCaptor1.capture()); 373 374 // retry 375 ClientStream mockStream2 = mock(ClientStream.class); 376 doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1); 377 sublistenerCaptor1.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), new Metadata()); 378 assertEquals(1, fakeClock.numPendingTasks()); 379 fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS); 380 381 ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 = 382 ArgumentCaptor.forClass(ClientStreamListener.class); 383 verify(mockStream2).start(sublistenerCaptor2.capture()); 384 inOrder.verify(retriableStreamRecorder, never()).postCommit(); 385 386 // headersRead 387 sublistenerCaptor2.getValue().headersRead(new Metadata()); 388 389 inOrder.verify(retriableStreamRecorder).postCommit(); 390 391 // cancel 392 retriableStream.cancel(Status.CANCELLED); 393 394 inOrder.verify(retriableStreamRecorder, never()).postCommit(); 395 } 396 397 @Test headersRead_closed()398 public void headersRead_closed() { 399 ClientStream mockStream1 = mock(ClientStream.class); 400 doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0); 401 InOrder inOrder = inOrder(retriableStreamRecorder); 402 403 retriableStream.start(masterListener); 404 405 ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 = 406 ArgumentCaptor.forClass(ClientStreamListener.class); 407 verify(mockStream1).start(sublistenerCaptor1.capture()); 408 409 sublistenerCaptor1.getValue().headersRead(new Metadata()); 410 411 inOrder.verify(retriableStreamRecorder).postCommit(); 412 413 Status status = Status.fromCode(RETRIABLE_STATUS_CODE_1); 414 Metadata metadata = new Metadata(); 415 sublistenerCaptor1.getValue().closed(status, metadata); 416 417 verify(masterListener).closed(status, metadata); 418 inOrder.verify(retriableStreamRecorder, never()).postCommit(); 419 } 420 421 @Test retry_headersRead_closed()422 public void retry_headersRead_closed() { 423 ClientStream mockStream1 = mock(ClientStream.class); 424 doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0); 425 InOrder inOrder = inOrder(retriableStreamRecorder); 426 427 retriableStream.start(masterListener); 428 429 ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 = 430 ArgumentCaptor.forClass(ClientStreamListener.class); 431 verify(mockStream1).start(sublistenerCaptor1.capture()); 432 433 // retry 434 ClientStream mockStream2 = mock(ClientStream.class); 435 doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1); 436 sublistenerCaptor1.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), new Metadata()); 437 fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS); 438 439 ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 = 440 ArgumentCaptor.forClass(ClientStreamListener.class); 441 verify(mockStream2).start(sublistenerCaptor2.capture()); 442 inOrder.verify(retriableStreamRecorder, never()).postCommit(); 443 444 // headersRead 445 sublistenerCaptor2.getValue().headersRead(new Metadata()); 446 447 inOrder.verify(retriableStreamRecorder).postCommit(); 448 449 // closed even with retriable status 450 Status status = Status.fromCode(RETRIABLE_STATUS_CODE_1); 451 Metadata metadata = new Metadata(); 452 sublistenerCaptor2.getValue().closed(status, metadata); 453 454 verify(masterListener).closed(status, metadata); 455 inOrder.verify(retriableStreamRecorder, never()).postCommit(); 456 } 457 458 @Test cancel_closed()459 public void cancel_closed() { 460 ClientStream mockStream1 = mock(ClientStream.class); 461 doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0); 462 InOrder inOrder = inOrder(retriableStreamRecorder); 463 464 retriableStream.start(masterListener); 465 466 ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 = 467 ArgumentCaptor.forClass(ClientStreamListener.class); 468 verify(mockStream1).start(sublistenerCaptor1.capture()); 469 470 // cancel 471 retriableStream.cancel(Status.CANCELLED); 472 473 inOrder.verify(retriableStreamRecorder).postCommit(); 474 ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class); 475 verify(mockStream1).cancel(statusCaptor.capture()); 476 assertEquals(Status.CANCELLED.getCode(), statusCaptor.getValue().getCode()); 477 assertEquals(CANCELLED_BECAUSE_COMMITTED, statusCaptor.getValue().getDescription()); 478 479 // closed even with retriable status 480 Status status = Status.fromCode(RETRIABLE_STATUS_CODE_1); 481 Metadata metadata = new Metadata(); 482 sublistenerCaptor1.getValue().closed(status, metadata); 483 inOrder.verify(retriableStreamRecorder, never()).postCommit(); 484 } 485 486 @Test retry_cancel_closed()487 public void retry_cancel_closed() { 488 ClientStream mockStream1 = mock(ClientStream.class); 489 doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0); 490 InOrder inOrder = inOrder(retriableStreamRecorder); 491 492 retriableStream.start(masterListener); 493 494 ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 = 495 ArgumentCaptor.forClass(ClientStreamListener.class); 496 verify(mockStream1).start(sublistenerCaptor1.capture()); 497 498 // retry 499 ClientStream mockStream2 = mock(ClientStream.class); 500 doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1); 501 sublistenerCaptor1.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), new Metadata()); 502 fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS); 503 504 ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 = 505 ArgumentCaptor.forClass(ClientStreamListener.class); 506 verify(mockStream2).start(sublistenerCaptor2.capture()); 507 inOrder.verify(retriableStreamRecorder, never()).postCommit(); 508 509 // cancel 510 retriableStream.cancel(Status.CANCELLED); 511 512 inOrder.verify(retriableStreamRecorder).postCommit(); 513 ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class); 514 verify(mockStream2).cancel(statusCaptor.capture()); 515 assertEquals(Status.CANCELLED.getCode(), statusCaptor.getValue().getCode()); 516 assertEquals(CANCELLED_BECAUSE_COMMITTED, statusCaptor.getValue().getDescription()); 517 518 // closed 519 Status status = Status.fromCode(NON_RETRIABLE_STATUS_CODE); 520 Metadata metadata = new Metadata(); 521 sublistenerCaptor2.getValue().closed(status, metadata); 522 inOrder.verify(retriableStreamRecorder, never()).postCommit(); 523 } 524 525 @Test unretriableClosed_cancel()526 public void unretriableClosed_cancel() { 527 ClientStream mockStream1 = mock(ClientStream.class); 528 doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0); 529 InOrder inOrder = inOrder(retriableStreamRecorder); 530 531 retriableStream.start(masterListener); 532 533 ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 = 534 ArgumentCaptor.forClass(ClientStreamListener.class); 535 verify(mockStream1).start(sublistenerCaptor1.capture()); 536 537 // closed 538 Status status = Status.fromCode(NON_RETRIABLE_STATUS_CODE); 539 Metadata metadata = new Metadata(); 540 sublistenerCaptor1.getValue().closed(status, metadata); 541 542 inOrder.verify(retriableStreamRecorder).postCommit(); 543 verify(masterListener).closed(status, metadata); 544 545 // cancel 546 retriableStream.cancel(Status.CANCELLED); 547 inOrder.verify(retriableStreamRecorder, never()).postCommit(); 548 } 549 550 @Test retry_unretriableClosed_cancel()551 public void retry_unretriableClosed_cancel() { 552 ClientStream mockStream1 = mock(ClientStream.class); 553 doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0); 554 InOrder inOrder = inOrder(retriableStreamRecorder); 555 556 retriableStream.start(masterListener); 557 558 ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 = 559 ArgumentCaptor.forClass(ClientStreamListener.class); 560 verify(mockStream1).start(sublistenerCaptor1.capture()); 561 562 // retry 563 ClientStream mockStream2 = mock(ClientStream.class); 564 doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1); 565 sublistenerCaptor1.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), new Metadata()); 566 fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS); 567 568 ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 = 569 ArgumentCaptor.forClass(ClientStreamListener.class); 570 verify(mockStream2).start(sublistenerCaptor2.capture()); 571 inOrder.verify(retriableStreamRecorder, never()).postCommit(); 572 573 // closed 574 Status status = Status.fromCode(NON_RETRIABLE_STATUS_CODE); 575 Metadata metadata = new Metadata(); 576 sublistenerCaptor2.getValue().closed(status, metadata); 577 578 inOrder.verify(retriableStreamRecorder).postCommit(); 579 verify(masterListener).closed(status, metadata); 580 581 // cancel 582 retriableStream.cancel(Status.CANCELLED); 583 inOrder.verify(retriableStreamRecorder, never()).postCommit(); 584 } 585 586 @Test retry_cancelWhileBackoff()587 public void retry_cancelWhileBackoff() { 588 ClientStream mockStream1 = mock(ClientStream.class); 589 doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0); 590 591 retriableStream.start(masterListener); 592 593 ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 = 594 ArgumentCaptor.forClass(ClientStreamListener.class); 595 verify(mockStream1).start(sublistenerCaptor1.capture()); 596 597 // retry 598 ClientStream mockStream2 = mock(ClientStream.class); 599 doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1); 600 sublistenerCaptor1.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), new Metadata()); 601 602 // cancel while backoff 603 assertEquals(1, fakeClock.numPendingTasks()); 604 verify(retriableStreamRecorder, never()).postCommit(); 605 retriableStream.cancel(Status.CANCELLED); 606 verify(retriableStreamRecorder).postCommit(); 607 608 verifyNoMoreInteractions(mockStream1); 609 verifyNoMoreInteractions(mockStream2); 610 } 611 612 @Test operationsWhileDraining()613 public void operationsWhileDraining() { 614 ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 = 615 ArgumentCaptor.forClass(ClientStreamListener.class); 616 final AtomicReference<ClientStreamListener> sublistenerCaptor2 = 617 new AtomicReference<ClientStreamListener>(); 618 final Status cancelStatus = Status.CANCELLED.withDescription("c"); 619 ClientStream mockStream1 = 620 mock( 621 ClientStream.class, 622 delegatesTo( 623 new NoopClientStream() { 624 @Override 625 public void request(int numMessages) { 626 retriableStream.sendMessage("substream1 request " + numMessages); 627 if (numMessages > 1) { 628 retriableStream.request(--numMessages); 629 } 630 } 631 })); 632 633 final ClientStream mockStream2 = 634 mock( 635 ClientStream.class, 636 delegatesTo( 637 new NoopClientStream() { 638 @Override 639 public void start(ClientStreamListener listener) { 640 sublistenerCaptor2.set(listener); 641 } 642 643 @Override 644 public void request(int numMessages) { 645 retriableStream.sendMessage("substream2 request " + numMessages); 646 647 if (numMessages == 3) { 648 sublistenerCaptor2.get().headersRead(new Metadata()); 649 } 650 if (numMessages == 2) { 651 retriableStream.request(100); 652 } 653 if (numMessages == 100) { 654 retriableStream.cancel(cancelStatus); 655 } 656 } 657 })); 658 659 InOrder inOrder = inOrder(retriableStreamRecorder, mockStream1, mockStream2); 660 661 doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0); 662 retriableStream.start(masterListener); 663 664 inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); 665 666 retriableStream.request(3); 667 668 inOrder.verify(mockStream1).request(3); 669 inOrder.verify(mockStream1).writeMessage(any(InputStream.class)); // msg "substream1 request 3" 670 inOrder.verify(mockStream1).request(2); 671 inOrder.verify(mockStream1).writeMessage(any(InputStream.class)); // msg "substream1 request 2" 672 inOrder.verify(mockStream1).request(1); 673 inOrder.verify(mockStream1).writeMessage(any(InputStream.class)); // msg "substream1 request 1" 674 675 // retry 676 doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1); 677 sublistenerCaptor1.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), new Metadata()); 678 assertEquals(1, fakeClock.numPendingTasks()); 679 680 // send more requests during backoff 681 retriableStream.request(789); 682 683 fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS); 684 685 inOrder.verify(mockStream2).start(sublistenerCaptor2.get()); 686 inOrder.verify(mockStream2).request(3); 687 inOrder.verify(retriableStreamRecorder).postCommit(); 688 inOrder.verify(mockStream2).writeMessage(any(InputStream.class)); // msg "substream1 request 3" 689 inOrder.verify(mockStream2).request(2); 690 inOrder.verify(mockStream2).writeMessage(any(InputStream.class)); // msg "substream1 request 2" 691 inOrder.verify(mockStream2).request(1); 692 693 // msg "substream1 request 1" 694 inOrder.verify(mockStream2).writeMessage(any(InputStream.class)); 695 inOrder.verify(mockStream2).request(789); 696 // msg "substream2 request 3" 697 // msg "substream2 request 2" 698 inOrder.verify(mockStream2, times(2)).writeMessage(any(InputStream.class)); 699 inOrder.verify(mockStream2).request(100); 700 701 verify(mockStream2).cancel(cancelStatus); 702 703 // "substream2 request 1" will never be sent 704 inOrder.verify(mockStream2, never()).writeMessage(any(InputStream.class)); 705 } 706 707 @Test operationsAfterImmediateCommit()708 public void operationsAfterImmediateCommit() { 709 ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 = 710 ArgumentCaptor.forClass(ClientStreamListener.class); 711 ClientStream mockStream1 = mock(ClientStream.class); 712 713 InOrder inOrder = inOrder(retriableStreamRecorder, mockStream1); 714 715 doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0); 716 retriableStream.start(masterListener); 717 718 // drained 719 inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); 720 721 // commit 722 sublistenerCaptor1.getValue().headersRead(new Metadata()); 723 724 retriableStream.request(3); 725 inOrder.verify(mockStream1).request(3); 726 retriableStream.sendMessage("msg 1"); 727 inOrder.verify(mockStream1).writeMessage(any(InputStream.class)); 728 } 729 730 @Test isReady_whenDrained()731 public void isReady_whenDrained() { 732 ClientStream mockStream1 = mock(ClientStream.class); 733 734 doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0); 735 retriableStream.start(masterListener); 736 737 assertFalse(retriableStream.isReady()); 738 739 doReturn(true).when(mockStream1).isReady(); 740 741 assertTrue(retriableStream.isReady()); 742 } 743 744 @Test isReady_whileDraining()745 public void isReady_whileDraining() { 746 final AtomicReference<ClientStreamListener> sublistenerCaptor1 = 747 new AtomicReference<ClientStreamListener>(); 748 final List<Boolean> readiness = new ArrayList<>(); 749 ClientStream mockStream1 = 750 mock( 751 ClientStream.class, 752 delegatesTo( 753 new NoopClientStream() { 754 @Override 755 public void start(ClientStreamListener listener) { 756 sublistenerCaptor1.set(listener); 757 readiness.add(retriableStream.isReady()); // expected false b/c in draining 758 } 759 760 @Override 761 public boolean isReady() { 762 return true; 763 } 764 })); 765 766 final ClientStream mockStream2 = 767 mock( 768 ClientStream.class, 769 delegatesTo( 770 new NoopClientStream() { 771 @Override 772 public void start(ClientStreamListener listener) { 773 readiness.add(retriableStream.isReady()); // expected false b/c in draining 774 } 775 776 @Override 777 public boolean isReady() { 778 return true; 779 } 780 })); 781 782 doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0); 783 retriableStream.start(masterListener); 784 785 verify(mockStream1).start(sublistenerCaptor1.get()); 786 readiness.add(retriableStream.isReady()); // expected true 787 788 // retry 789 doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1); 790 doReturn(false).when(mockStream1).isReady(); // mockStream1 closed, so isReady false 791 sublistenerCaptor1.get().closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), new Metadata()); 792 assertEquals(1, fakeClock.numPendingTasks()); 793 794 // send more requests during backoff 795 retriableStream.request(789); 796 readiness.add(retriableStream.isReady()); // expected false b/c in backoff 797 798 fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS); 799 800 verify(mockStream2).start(any(ClientStreamListener.class)); 801 readiness.add(retriableStream.isReady()); // expected true 802 803 assertThat(readiness).containsExactly(false, true, false, false, true).inOrder(); 804 } 805 806 @Test messageAvailable()807 public void messageAvailable() { 808 ClientStream mockStream1 = mock(ClientStream.class); 809 doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0); 810 811 retriableStream.start(masterListener); 812 813 ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 = 814 ArgumentCaptor.forClass(ClientStreamListener.class); 815 verify(mockStream1).start(sublistenerCaptor1.capture()); 816 817 ClientStreamListener listener = sublistenerCaptor1.getValue(); 818 listener.headersRead(new Metadata()); 819 MessageProducer messageProducer = mock(MessageProducer.class); 820 listener.messagesAvailable(messageProducer); 821 verify(masterListener).messagesAvailable(messageProducer); 822 } 823 824 @Test closedWhileDraining()825 public void closedWhileDraining() { 826 ClientStream mockStream1 = mock(ClientStream.class); 827 final ClientStream mockStream2 = 828 mock( 829 ClientStream.class, 830 delegatesTo( 831 new NoopClientStream() { 832 @Override 833 public void start(ClientStreamListener listener) { 834 // closed while draning 835 listener.closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), new Metadata()); 836 } 837 })); 838 final ClientStream mockStream3 = mock(ClientStream.class); 839 840 when(retriableStreamRecorder.newSubstream(anyInt())) 841 .thenReturn(mockStream1, mockStream2, mockStream3); 842 843 retriableStream.start(masterListener); 844 retriableStream.sendMessage("msg1"); 845 retriableStream.sendMessage("msg2"); 846 847 ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 = 848 ArgumentCaptor.forClass(ClientStreamListener.class); 849 verify(mockStream1).start(sublistenerCaptor1.capture()); 850 851 ClientStreamListener listener1 = sublistenerCaptor1.getValue(); 852 853 // retry 854 listener1.closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), new Metadata()); 855 assertEquals(1, fakeClock.numPendingTasks()); 856 fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS); 857 assertEquals(1, fakeClock.numPendingTasks()); 858 859 // send requests during backoff 860 retriableStream.request(3); 861 fakeClock.forwardTime( 862 (long) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * FAKE_RANDOM), TimeUnit.SECONDS); 863 864 retriableStream.request(1); 865 verify(mockStream1, never()).request(anyInt()); 866 verify(mockStream2, never()).request(anyInt()); 867 verify(mockStream3).request(3); 868 verify(mockStream3).request(1); 869 } 870 871 872 @Test perRpcBufferLimitExceeded()873 public void perRpcBufferLimitExceeded() { 874 ClientStream mockStream1 = mock(ClientStream.class); 875 doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0); 876 877 retriableStream.start(masterListener); 878 879 bufferSizeTracer.outboundWireSize(PER_RPC_BUFFER_LIMIT); 880 881 assertEquals(PER_RPC_BUFFER_LIMIT, channelBufferUsed.addAndGet(0)); 882 883 verify(retriableStreamRecorder, never()).postCommit(); 884 bufferSizeTracer.outboundWireSize(2); 885 verify(retriableStreamRecorder).postCommit(); 886 887 // verify channel buffer is adjusted 888 assertEquals(0, channelBufferUsed.addAndGet(0)); 889 } 890 891 @Test perRpcBufferLimitExceededDuringBackoff()892 public void perRpcBufferLimitExceededDuringBackoff() { 893 ClientStream mockStream1 = mock(ClientStream.class); 894 doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0); 895 896 retriableStream.start(masterListener); 897 898 ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 = 899 ArgumentCaptor.forClass(ClientStreamListener.class); 900 verify(mockStream1).start(sublistenerCaptor1.capture()); 901 902 bufferSizeTracer.outboundWireSize(PER_RPC_BUFFER_LIMIT - 1); 903 904 // retry 905 ClientStream mockStream2 = mock(ClientStream.class); 906 doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1); 907 sublistenerCaptor1.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), new Metadata()); 908 909 // bufferSizeTracer.outboundWireSize() quits immediately while backoff b/c substream1 is closed 910 assertEquals(1, fakeClock.numPendingTasks()); 911 bufferSizeTracer.outboundWireSize(2); 912 verify(retriableStreamRecorder, never()).postCommit(); 913 914 fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS); 915 verify(mockStream2).start(any(ClientStreamListener.class)); 916 917 // bufferLimitExceeded 918 bufferSizeTracer.outboundWireSize(PER_RPC_BUFFER_LIMIT - 1); 919 verify(retriableStreamRecorder, never()).postCommit(); 920 bufferSizeTracer.outboundWireSize(2); 921 verify(retriableStreamRecorder).postCommit(); 922 923 verifyNoMoreInteractions(mockStream1); 924 verifyNoMoreInteractions(mockStream2); 925 } 926 927 @Test channelBufferLimitExceeded()928 public void channelBufferLimitExceeded() { 929 ClientStream mockStream1 = mock(ClientStream.class); 930 doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0); 931 932 retriableStream.start(masterListener); 933 934 bufferSizeTracer.outboundWireSize(100); 935 936 assertEquals(100, channelBufferUsed.addAndGet(0)); 937 938 channelBufferUsed.addAndGet(CHANNEL_BUFFER_LIMIT - 200); 939 verify(retriableStreamRecorder, never()).postCommit(); 940 bufferSizeTracer.outboundWireSize(100 + 1); 941 verify(retriableStreamRecorder).postCommit(); 942 943 // verify channel buffer is adjusted 944 assertEquals(CHANNEL_BUFFER_LIMIT - 200, channelBufferUsed.addAndGet(0)); 945 } 946 947 @Test updateHeaders()948 public void updateHeaders() { 949 Metadata originalHeaders = new Metadata(); 950 Metadata headers = retriableStream.updateHeaders(originalHeaders, 0); 951 assertNotSame(originalHeaders, headers); 952 assertNull(headers.get(GRPC_PREVIOUS_RPC_ATTEMPTS)); 953 954 headers = retriableStream.updateHeaders(originalHeaders, 345); 955 assertEquals("345", headers.get(GRPC_PREVIOUS_RPC_ATTEMPTS)); 956 assertNull(originalHeaders.get(GRPC_PREVIOUS_RPC_ATTEMPTS)); 957 } 958 959 @Test expBackoff_maxBackoff_maxRetryAttempts()960 public void expBackoff_maxBackoff_maxRetryAttempts() { 961 ClientStream mockStream1 = mock(ClientStream.class); 962 ClientStream mockStream2 = mock(ClientStream.class); 963 ClientStream mockStream3 = mock(ClientStream.class); 964 ClientStream mockStream4 = mock(ClientStream.class); 965 ClientStream mockStream5 = mock(ClientStream.class); 966 ClientStream mockStream6 = mock(ClientStream.class); 967 ClientStream mockStream7 = mock(ClientStream.class); 968 InOrder inOrder = inOrder( 969 mockStream1, mockStream2, mockStream3, mockStream4, mockStream5, mockStream6, mockStream7); 970 when(retriableStreamRecorder.newSubstream(anyInt())).thenReturn( 971 mockStream1, mockStream2, mockStream3, mockStream4, mockStream5, mockStream6, mockStream7); 972 973 retriableStream.start(masterListener); 974 assertEquals(0, fakeClock.numPendingTasks()); 975 verify(retriableStreamRecorder).newSubstream(0); 976 ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 = 977 ArgumentCaptor.forClass(ClientStreamListener.class); 978 inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); 979 inOrder.verifyNoMoreInteractions(); 980 981 982 // retry1 983 sublistenerCaptor1.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), new Metadata()); 984 assertEquals(1, fakeClock.numPendingTasks()); 985 fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM) - 1L, TimeUnit.SECONDS); 986 assertEquals(1, fakeClock.numPendingTasks()); 987 fakeClock.forwardTime(1L, TimeUnit.SECONDS); 988 assertEquals(0, fakeClock.numPendingTasks()); 989 verify(retriableStreamRecorder).newSubstream(1); 990 ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 = 991 ArgumentCaptor.forClass(ClientStreamListener.class); 992 inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); 993 inOrder.verifyNoMoreInteractions(); 994 995 // retry2 996 sublistenerCaptor2.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_2), new Metadata()); 997 assertEquals(1, fakeClock.numPendingTasks()); 998 fakeClock.forwardTime( 999 (long) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * FAKE_RANDOM) - 1L, 1000 TimeUnit.SECONDS); 1001 assertEquals(1, fakeClock.numPendingTasks()); 1002 fakeClock.forwardTime(1L, TimeUnit.SECONDS); 1003 assertEquals(0, fakeClock.numPendingTasks()); 1004 verify(retriableStreamRecorder).newSubstream(2); 1005 ArgumentCaptor<ClientStreamListener> sublistenerCaptor3 = 1006 ArgumentCaptor.forClass(ClientStreamListener.class); 1007 inOrder.verify(mockStream3).start(sublistenerCaptor3.capture()); 1008 inOrder.verifyNoMoreInteractions(); 1009 1010 // retry3 1011 sublistenerCaptor3.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), new Metadata()); 1012 assertEquals(1, fakeClock.numPendingTasks()); 1013 fakeClock.forwardTime( 1014 (long) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * BACKOFF_MULTIPLIER * FAKE_RANDOM) 1015 - 1L, 1016 TimeUnit.SECONDS); 1017 assertEquals(1, fakeClock.numPendingTasks()); 1018 fakeClock.forwardTime(1L, TimeUnit.SECONDS); 1019 assertEquals(0, fakeClock.numPendingTasks()); 1020 verify(retriableStreamRecorder).newSubstream(3); 1021 ArgumentCaptor<ClientStreamListener> sublistenerCaptor4 = 1022 ArgumentCaptor.forClass(ClientStreamListener.class); 1023 inOrder.verify(mockStream4).start(sublistenerCaptor4.capture()); 1024 inOrder.verifyNoMoreInteractions(); 1025 1026 // retry4 1027 sublistenerCaptor4.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_2), new Metadata()); 1028 assertEquals(1, fakeClock.numPendingTasks()); 1029 fakeClock.forwardTime((long) (MAX_BACKOFF_IN_SECONDS * FAKE_RANDOM) - 1L, TimeUnit.SECONDS); 1030 assertEquals(1, fakeClock.numPendingTasks()); 1031 fakeClock.forwardTime(1L, TimeUnit.SECONDS); 1032 assertEquals(0, fakeClock.numPendingTasks()); 1033 verify(retriableStreamRecorder).newSubstream(4); 1034 ArgumentCaptor<ClientStreamListener> sublistenerCaptor5 = 1035 ArgumentCaptor.forClass(ClientStreamListener.class); 1036 inOrder.verify(mockStream5).start(sublistenerCaptor5.capture()); 1037 inOrder.verifyNoMoreInteractions(); 1038 1039 // retry5 1040 sublistenerCaptor5.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_2), new Metadata()); 1041 assertEquals(1, fakeClock.numPendingTasks()); 1042 fakeClock.forwardTime((long) (MAX_BACKOFF_IN_SECONDS * FAKE_RANDOM) - 1L, TimeUnit.SECONDS); 1043 assertEquals(1, fakeClock.numPendingTasks()); 1044 fakeClock.forwardTime(1L, TimeUnit.SECONDS); 1045 assertEquals(0, fakeClock.numPendingTasks()); 1046 verify(retriableStreamRecorder).newSubstream(5); 1047 ArgumentCaptor<ClientStreamListener> sublistenerCaptor6 = 1048 ArgumentCaptor.forClass(ClientStreamListener.class); 1049 inOrder.verify(mockStream6).start(sublistenerCaptor6.capture()); 1050 inOrder.verifyNoMoreInteractions(); 1051 1052 // can not retry any more 1053 verify(retriableStreamRecorder, never()).postCommit(); 1054 sublistenerCaptor6.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), new Metadata()); 1055 verify(retriableStreamRecorder).postCommit(); 1056 inOrder.verifyNoMoreInteractions(); 1057 } 1058 1059 @Test pushback()1060 public void pushback() { 1061 ClientStream mockStream1 = mock(ClientStream.class); 1062 ClientStream mockStream2 = mock(ClientStream.class); 1063 ClientStream mockStream3 = mock(ClientStream.class); 1064 ClientStream mockStream4 = mock(ClientStream.class); 1065 ClientStream mockStream5 = mock(ClientStream.class); 1066 ClientStream mockStream6 = mock(ClientStream.class); 1067 ClientStream mockStream7 = mock(ClientStream.class); 1068 InOrder inOrder = inOrder( 1069 mockStream1, mockStream2, mockStream3, mockStream4, mockStream5, mockStream6, mockStream7); 1070 when(retriableStreamRecorder.newSubstream(anyInt())).thenReturn( 1071 mockStream1, mockStream2, mockStream3, mockStream4, mockStream5, mockStream6, mockStream7); 1072 1073 retriableStream.start(masterListener); 1074 assertEquals(0, fakeClock.numPendingTasks()); 1075 verify(retriableStreamRecorder).newSubstream(0); 1076 ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 = 1077 ArgumentCaptor.forClass(ClientStreamListener.class); 1078 inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); 1079 inOrder.verifyNoMoreInteractions(); 1080 1081 1082 // retry1 1083 int pushbackInMillis = 123; 1084 Metadata headers = new Metadata(); 1085 headers.put(RetriableStream.GRPC_RETRY_PUSHBACK_MS, "" + pushbackInMillis); 1086 sublistenerCaptor1.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), headers); 1087 assertEquals(1, fakeClock.numPendingTasks()); 1088 fakeClock.forwardTime(pushbackInMillis - 1, TimeUnit.MILLISECONDS); 1089 assertEquals(1, fakeClock.numPendingTasks()); 1090 fakeClock.forwardTime(1L, TimeUnit.MILLISECONDS); 1091 assertEquals(0, fakeClock.numPendingTasks()); 1092 verify(retriableStreamRecorder).newSubstream(1); 1093 ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 = 1094 ArgumentCaptor.forClass(ClientStreamListener.class); 1095 inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); 1096 inOrder.verifyNoMoreInteractions(); 1097 1098 // retry2 1099 pushbackInMillis = 4567 * 1000; 1100 headers = new Metadata(); 1101 headers.put(RetriableStream.GRPC_RETRY_PUSHBACK_MS, "" + pushbackInMillis); 1102 sublistenerCaptor2.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_2), headers); 1103 assertEquals(1, fakeClock.numPendingTasks()); 1104 fakeClock.forwardTime(pushbackInMillis - 1, TimeUnit.MILLISECONDS); 1105 assertEquals(1, fakeClock.numPendingTasks()); 1106 fakeClock.forwardTime(1L, TimeUnit.MILLISECONDS); 1107 assertEquals(0, fakeClock.numPendingTasks()); 1108 verify(retriableStreamRecorder).newSubstream(2); 1109 ArgumentCaptor<ClientStreamListener> sublistenerCaptor3 = 1110 ArgumentCaptor.forClass(ClientStreamListener.class); 1111 inOrder.verify(mockStream3).start(sublistenerCaptor3.capture()); 1112 inOrder.verifyNoMoreInteractions(); 1113 1114 // retry3 1115 sublistenerCaptor3.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), new Metadata()); 1116 assertEquals(1, fakeClock.numPendingTasks()); 1117 fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM) - 1L, TimeUnit.SECONDS); 1118 assertEquals(1, fakeClock.numPendingTasks()); 1119 fakeClock.forwardTime(1L, TimeUnit.SECONDS); 1120 assertEquals(0, fakeClock.numPendingTasks()); 1121 verify(retriableStreamRecorder).newSubstream(3); 1122 ArgumentCaptor<ClientStreamListener> sublistenerCaptor4 = 1123 ArgumentCaptor.forClass(ClientStreamListener.class); 1124 inOrder.verify(mockStream4).start(sublistenerCaptor4.capture()); 1125 inOrder.verifyNoMoreInteractions(); 1126 1127 // retry4 1128 sublistenerCaptor4.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_2), new Metadata()); 1129 assertEquals(1, fakeClock.numPendingTasks()); 1130 fakeClock.forwardTime( 1131 (long) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * FAKE_RANDOM) - 1L, 1132 TimeUnit.SECONDS); 1133 assertEquals(1, fakeClock.numPendingTasks()); 1134 fakeClock.forwardTime(1L, TimeUnit.SECONDS); 1135 assertEquals(0, fakeClock.numPendingTasks()); 1136 verify(retriableStreamRecorder).newSubstream(4); 1137 ArgumentCaptor<ClientStreamListener> sublistenerCaptor5 = 1138 ArgumentCaptor.forClass(ClientStreamListener.class); 1139 inOrder.verify(mockStream5).start(sublistenerCaptor5.capture()); 1140 inOrder.verifyNoMoreInteractions(); 1141 1142 // retry5 1143 sublistenerCaptor5.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_2), new Metadata()); 1144 assertEquals(1, fakeClock.numPendingTasks()); 1145 fakeClock.forwardTime( 1146 (long) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * BACKOFF_MULTIPLIER * FAKE_RANDOM) 1147 - 1L, 1148 TimeUnit.SECONDS); 1149 assertEquals(1, fakeClock.numPendingTasks()); 1150 fakeClock.forwardTime(1L, TimeUnit.SECONDS); 1151 assertEquals(0, fakeClock.numPendingTasks()); 1152 verify(retriableStreamRecorder).newSubstream(5); 1153 ArgumentCaptor<ClientStreamListener> sublistenerCaptor6 = 1154 ArgumentCaptor.forClass(ClientStreamListener.class); 1155 inOrder.verify(mockStream6).start(sublistenerCaptor6.capture()); 1156 inOrder.verifyNoMoreInteractions(); 1157 1158 // can not retry any more even pushback is positive 1159 pushbackInMillis = 4567 * 1000; 1160 headers = new Metadata(); 1161 headers.put(RetriableStream.GRPC_RETRY_PUSHBACK_MS, "" + pushbackInMillis); 1162 verify(retriableStreamRecorder, never()).postCommit(); 1163 sublistenerCaptor6.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), headers); 1164 verify(retriableStreamRecorder).postCommit(); 1165 inOrder.verifyNoMoreInteractions(); 1166 } 1167 1168 @Test pushback_noRetry()1169 public void pushback_noRetry() { 1170 ClientStream mockStream1 = mock(ClientStream.class); 1171 doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(anyInt()); 1172 1173 retriableStream.start(masterListener); 1174 assertEquals(0, fakeClock.numPendingTasks()); 1175 verify(retriableStreamRecorder).newSubstream(0); 1176 ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 = 1177 ArgumentCaptor.forClass(ClientStreamListener.class); 1178 verify(mockStream1).start(sublistenerCaptor1.capture()); 1179 verify(retriableStreamRecorder, never()).postCommit(); 1180 1181 // pushback no retry 1182 Metadata headers = new Metadata(); 1183 headers.put(RetriableStream.GRPC_RETRY_PUSHBACK_MS, ""); 1184 sublistenerCaptor1.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), headers); 1185 1186 verify(retriableStreamRecorder, never()).newSubstream(1); 1187 verify(retriableStreamRecorder).postCommit(); 1188 } 1189 1190 @Test throttle()1191 public void throttle() { 1192 Throttle throttle = new Throttle(4f, 0.8f); 1193 assertTrue(throttle.isAboveThreshold()); 1194 assertTrue(throttle.onQualifiedFailureThenCheckIsAboveThreshold()); // token = 3 1195 assertTrue(throttle.isAboveThreshold()); 1196 assertFalse(throttle.onQualifiedFailureThenCheckIsAboveThreshold()); // token = 2 1197 assertFalse(throttle.isAboveThreshold()); 1198 assertFalse(throttle.onQualifiedFailureThenCheckIsAboveThreshold()); // token = 1 1199 assertFalse(throttle.onQualifiedFailureThenCheckIsAboveThreshold()); // token = 0 1200 assertFalse(throttle.onQualifiedFailureThenCheckIsAboveThreshold()); // token = 0 1201 assertFalse(throttle.onQualifiedFailureThenCheckIsAboveThreshold()); // token = 0 1202 assertFalse(throttle.isAboveThreshold()); 1203 1204 throttle.onSuccess(); // token = 0.8 1205 assertFalse(throttle.isAboveThreshold()); 1206 throttle.onSuccess(); // token = 1.6 1207 assertFalse(throttle.isAboveThreshold()); 1208 throttle.onSuccess(); // token = 3.2 1209 assertTrue(throttle.isAboveThreshold()); 1210 throttle.onSuccess(); // token = 4 1211 assertTrue(throttle.isAboveThreshold()); 1212 throttle.onSuccess(); // token = 4 1213 assertTrue(throttle.isAboveThreshold()); 1214 throttle.onSuccess(); // token = 4 1215 assertTrue(throttle.isAboveThreshold()); 1216 1217 assertTrue(throttle.isAboveThreshold()); 1218 assertTrue(throttle.onQualifiedFailureThenCheckIsAboveThreshold()); // token = 3 1219 assertTrue(throttle.isAboveThreshold()); 1220 assertFalse(throttle.onQualifiedFailureThenCheckIsAboveThreshold()); // token = 2 1221 assertFalse(throttle.isAboveThreshold()); 1222 } 1223 1224 @Test throttledStream_FailWithRetriableStatusCode_WithoutPushback()1225 public void throttledStream_FailWithRetriableStatusCode_WithoutPushback() { 1226 Throttle throttle = new Throttle(4f, 0.8f); 1227 RetriableStream<String> retriableStream = newThrottledRetriableStream(throttle); 1228 1229 ClientStream mockStream = mock(ClientStream.class); 1230 doReturn(mockStream).when(retriableStreamRecorder).newSubstream(anyInt()); 1231 retriableStream.start(masterListener); 1232 ArgumentCaptor<ClientStreamListener> sublistenerCaptor = 1233 ArgumentCaptor.forClass(ClientStreamListener.class); 1234 verify(mockStream).start(sublistenerCaptor.capture()); 1235 1236 // mimic some other call in the channel triggers a throttle countdown 1237 assertTrue(throttle.onQualifiedFailureThenCheckIsAboveThreshold()); // count = 3 1238 1239 sublistenerCaptor.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), new Metadata()); 1240 verify(retriableStreamRecorder).postCommit(); 1241 assertFalse(throttle.isAboveThreshold()); // count = 2 1242 } 1243 1244 @Test throttledStream_FailWithNonRetriableStatusCode_WithoutPushback()1245 public void throttledStream_FailWithNonRetriableStatusCode_WithoutPushback() { 1246 Throttle throttle = new Throttle(4f, 0.8f); 1247 RetriableStream<String> retriableStream = newThrottledRetriableStream(throttle); 1248 1249 ClientStream mockStream = mock(ClientStream.class); 1250 doReturn(mockStream).when(retriableStreamRecorder).newSubstream(anyInt()); 1251 retriableStream.start(masterListener); 1252 ArgumentCaptor<ClientStreamListener> sublistenerCaptor = 1253 ArgumentCaptor.forClass(ClientStreamListener.class); 1254 verify(mockStream).start(sublistenerCaptor.capture()); 1255 1256 // mimic some other call in the channel triggers a throttle countdown 1257 assertTrue(throttle.onQualifiedFailureThenCheckIsAboveThreshold()); // count = 3 1258 1259 sublistenerCaptor.getValue().closed(Status.fromCode(NON_RETRIABLE_STATUS_CODE), new Metadata()); 1260 verify(retriableStreamRecorder).postCommit(); 1261 assertTrue(throttle.isAboveThreshold()); // count = 3 1262 1263 assertFalse(throttle.onQualifiedFailureThenCheckIsAboveThreshold()); // count = 2 1264 } 1265 1266 @Test throttledStream_FailWithRetriableStatusCode_WithRetriablePushback()1267 public void throttledStream_FailWithRetriableStatusCode_WithRetriablePushback() { 1268 Throttle throttle = new Throttle(4f, 0.8f); 1269 RetriableStream<String> retriableStream = newThrottledRetriableStream(throttle); 1270 1271 ClientStream mockStream = mock(ClientStream.class); 1272 doReturn(mockStream).when(retriableStreamRecorder).newSubstream(anyInt()); 1273 retriableStream.start(masterListener); 1274 ArgumentCaptor<ClientStreamListener> sublistenerCaptor = 1275 ArgumentCaptor.forClass(ClientStreamListener.class); 1276 verify(mockStream).start(sublistenerCaptor.capture()); 1277 1278 // mimic some other call in the channel triggers a throttle countdown 1279 assertTrue(throttle.onQualifiedFailureThenCheckIsAboveThreshold()); // count = 3 1280 1281 int pushbackInMillis = 123; 1282 Metadata headers = new Metadata(); 1283 headers.put(RetriableStream.GRPC_RETRY_PUSHBACK_MS, "" + pushbackInMillis); 1284 sublistenerCaptor.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), headers); 1285 verify(retriableStreamRecorder).postCommit(); 1286 assertFalse(throttle.isAboveThreshold()); // count = 2 1287 } 1288 1289 @Test throttledStream_FailWithNonRetriableStatusCode_WithRetriablePushback()1290 public void throttledStream_FailWithNonRetriableStatusCode_WithRetriablePushback() { 1291 Throttle throttle = new Throttle(4f, 0.8f); 1292 RetriableStream<String> retriableStream = newThrottledRetriableStream(throttle); 1293 1294 ClientStream mockStream = mock(ClientStream.class); 1295 doReturn(mockStream).when(retriableStreamRecorder).newSubstream(anyInt()); 1296 retriableStream.start(masterListener); 1297 ArgumentCaptor<ClientStreamListener> sublistenerCaptor = 1298 ArgumentCaptor.forClass(ClientStreamListener.class); 1299 verify(mockStream).start(sublistenerCaptor.capture()); 1300 1301 // mimic some other call in the channel triggers a throttle countdown 1302 assertTrue(throttle.onQualifiedFailureThenCheckIsAboveThreshold()); // count = 3 1303 1304 int pushbackInMillis = 123; 1305 Metadata headers = new Metadata(); 1306 headers.put(RetriableStream.GRPC_RETRY_PUSHBACK_MS, "" + pushbackInMillis); 1307 sublistenerCaptor.getValue().closed(Status.fromCode(NON_RETRIABLE_STATUS_CODE), headers); 1308 verify(retriableStreamRecorder, never()).postCommit(); 1309 assertTrue(throttle.isAboveThreshold()); // count = 3 1310 1311 // drain pending retry 1312 fakeClock.forwardTime(pushbackInMillis, TimeUnit.MILLISECONDS); 1313 1314 assertTrue(throttle.isAboveThreshold()); // count = 3 1315 assertFalse(throttle.onQualifiedFailureThenCheckIsAboveThreshold()); // count = 2 1316 } 1317 1318 @Test throttledStream_FailWithRetriableStatusCode_WithNonRetriablePushback()1319 public void throttledStream_FailWithRetriableStatusCode_WithNonRetriablePushback() { 1320 Throttle throttle = new Throttle(4f, 0.8f); 1321 RetriableStream<String> retriableStream = newThrottledRetriableStream(throttle); 1322 1323 ClientStream mockStream = mock(ClientStream.class); 1324 doReturn(mockStream).when(retriableStreamRecorder).newSubstream(anyInt()); 1325 retriableStream.start(masterListener); 1326 ArgumentCaptor<ClientStreamListener> sublistenerCaptor = 1327 ArgumentCaptor.forClass(ClientStreamListener.class); 1328 verify(mockStream).start(sublistenerCaptor.capture()); 1329 1330 // mimic some other call in the channel triggers a throttle countdown 1331 assertTrue(throttle.onQualifiedFailureThenCheckIsAboveThreshold()); // count = 3 1332 1333 Metadata headers = new Metadata(); 1334 headers.put(RetriableStream.GRPC_RETRY_PUSHBACK_MS, ""); 1335 sublistenerCaptor.getValue().closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), headers); 1336 verify(retriableStreamRecorder).postCommit(); 1337 assertFalse(throttle.isAboveThreshold()); // count = 2 1338 } 1339 1340 @Test throttledStream_FailWithNonRetriableStatusCode_WithNonRetriablePushback()1341 public void throttledStream_FailWithNonRetriableStatusCode_WithNonRetriablePushback() { 1342 Throttle throttle = new Throttle(4f, 0.8f); 1343 RetriableStream<String> retriableStream = newThrottledRetriableStream(throttle); 1344 1345 ClientStream mockStream = mock(ClientStream.class); 1346 doReturn(mockStream).when(retriableStreamRecorder).newSubstream(anyInt()); 1347 retriableStream.start(masterListener); 1348 ArgumentCaptor<ClientStreamListener> sublistenerCaptor = 1349 ArgumentCaptor.forClass(ClientStreamListener.class); 1350 verify(mockStream).start(sublistenerCaptor.capture()); 1351 1352 // mimic some other call in the channel triggers a throttle countdown 1353 assertTrue(throttle.onQualifiedFailureThenCheckIsAboveThreshold()); // count = 3 1354 1355 Metadata headers = new Metadata(); 1356 headers.put(RetriableStream.GRPC_RETRY_PUSHBACK_MS, ""); 1357 sublistenerCaptor.getValue().closed(Status.fromCode(NON_RETRIABLE_STATUS_CODE), headers); 1358 verify(retriableStreamRecorder).postCommit(); 1359 assertFalse(throttle.isAboveThreshold()); // count = 2 1360 } 1361 1362 @Test throttleStream_Succeed()1363 public void throttleStream_Succeed() { 1364 Throttle throttle = new Throttle(4f, 0.8f); 1365 RetriableStream<String> retriableStream = newThrottledRetriableStream(throttle); 1366 1367 ClientStream mockStream = mock(ClientStream.class); 1368 doReturn(mockStream).when(retriableStreamRecorder).newSubstream(anyInt()); 1369 retriableStream.start(masterListener); 1370 ArgumentCaptor<ClientStreamListener> sublistenerCaptor = 1371 ArgumentCaptor.forClass(ClientStreamListener.class); 1372 verify(mockStream).start(sublistenerCaptor.capture()); 1373 1374 // mimic some other calls in the channel trigger throttle countdowns 1375 assertTrue(throttle.onQualifiedFailureThenCheckIsAboveThreshold()); // count = 3 1376 assertFalse(throttle.onQualifiedFailureThenCheckIsAboveThreshold()); // count = 2 1377 assertFalse(throttle.onQualifiedFailureThenCheckIsAboveThreshold()); // count = 1 1378 1379 sublistenerCaptor.getValue().headersRead(new Metadata()); 1380 verify(retriableStreamRecorder).postCommit(); 1381 assertFalse(throttle.isAboveThreshold()); // count = 1.8 1382 1383 // mimic some other call in the channel triggers a success 1384 throttle.onSuccess(); 1385 assertTrue(throttle.isAboveThreshold()); // count = 2.6 1386 } 1387 1388 @Test transparentRetry()1389 public void transparentRetry() { 1390 ClientStream mockStream1 = mock(ClientStream.class); 1391 ClientStream mockStream2 = mock(ClientStream.class); 1392 ClientStream mockStream3 = mock(ClientStream.class); 1393 InOrder inOrder = inOrder( 1394 retriableStreamRecorder, 1395 mockStream1, mockStream2, mockStream3); 1396 1397 // start 1398 doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0); 1399 retriableStream.start(masterListener); 1400 1401 inOrder.verify(retriableStreamRecorder).newSubstream(0); 1402 ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 = 1403 ArgumentCaptor.forClass(ClientStreamListener.class); 1404 inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); 1405 inOrder.verifyNoMoreInteractions(); 1406 1407 // transparent retry 1408 doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(0); 1409 sublistenerCaptor1.getValue() 1410 .closed(Status.fromCode(NON_RETRIABLE_STATUS_CODE), REFUSED, new Metadata()); 1411 1412 inOrder.verify(retriableStreamRecorder).newSubstream(0); 1413 ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 = 1414 ArgumentCaptor.forClass(ClientStreamListener.class); 1415 inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); 1416 inOrder.verifyNoMoreInteractions(); 1417 verify(retriableStreamRecorder, never()).postCommit(); 1418 assertEquals(0, fakeClock.numPendingTasks()); 1419 1420 // no more transparent retry 1421 doReturn(mockStream3).when(retriableStreamRecorder).newSubstream(1); 1422 sublistenerCaptor2.getValue() 1423 .closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), REFUSED, new Metadata()); 1424 1425 assertEquals(1, fakeClock.numPendingTasks()); 1426 fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS); 1427 inOrder.verify(retriableStreamRecorder).newSubstream(1); 1428 ArgumentCaptor<ClientStreamListener> sublistenerCaptor3 = 1429 ArgumentCaptor.forClass(ClientStreamListener.class); 1430 inOrder.verify(mockStream3).start(sublistenerCaptor3.capture()); 1431 inOrder.verifyNoMoreInteractions(); 1432 verify(retriableStreamRecorder, never()).postCommit(); 1433 assertEquals(0, fakeClock.numPendingTasks()); 1434 } 1435 1436 @Test normalRetry_thenNoTransparentRetry_butNormalRetry()1437 public void normalRetry_thenNoTransparentRetry_butNormalRetry() { 1438 ClientStream mockStream1 = mock(ClientStream.class); 1439 ClientStream mockStream2 = mock(ClientStream.class); 1440 ClientStream mockStream3 = mock(ClientStream.class); 1441 InOrder inOrder = inOrder( 1442 retriableStreamRecorder, 1443 mockStream1, mockStream2, mockStream3); 1444 1445 // start 1446 doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0); 1447 retriableStream.start(masterListener); 1448 1449 inOrder.verify(retriableStreamRecorder).newSubstream(0); 1450 ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 = 1451 ArgumentCaptor.forClass(ClientStreamListener.class); 1452 inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); 1453 inOrder.verifyNoMoreInteractions(); 1454 1455 // normal retry 1456 doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1); 1457 sublistenerCaptor1.getValue() 1458 .closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); 1459 1460 assertEquals(1, fakeClock.numPendingTasks()); 1461 fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS); 1462 inOrder.verify(retriableStreamRecorder).newSubstream(1); 1463 ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 = 1464 ArgumentCaptor.forClass(ClientStreamListener.class); 1465 inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); 1466 inOrder.verifyNoMoreInteractions(); 1467 verify(retriableStreamRecorder, never()).postCommit(); 1468 assertEquals(0, fakeClock.numPendingTasks()); 1469 1470 // no more transparent retry 1471 doReturn(mockStream3).when(retriableStreamRecorder).newSubstream(2); 1472 sublistenerCaptor2.getValue() 1473 .closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), REFUSED, new Metadata()); 1474 1475 assertEquals(1, fakeClock.numPendingTasks()); 1476 fakeClock.forwardTime( 1477 (long) (INITIAL_BACKOFF_IN_SECONDS * BACKOFF_MULTIPLIER * FAKE_RANDOM), TimeUnit.SECONDS); 1478 inOrder.verify(retriableStreamRecorder).newSubstream(2); 1479 ArgumentCaptor<ClientStreamListener> sublistenerCaptor3 = 1480 ArgumentCaptor.forClass(ClientStreamListener.class); 1481 inOrder.verify(mockStream3).start(sublistenerCaptor3.capture()); 1482 inOrder.verifyNoMoreInteractions(); 1483 verify(retriableStreamRecorder, never()).postCommit(); 1484 } 1485 1486 @Test normalRetry_thenNoTransparentRetry_andNoMoreRetry()1487 public void normalRetry_thenNoTransparentRetry_andNoMoreRetry() { 1488 ClientStream mockStream1 = mock(ClientStream.class); 1489 ClientStream mockStream2 = mock(ClientStream.class); 1490 ClientStream mockStream3 = mock(ClientStream.class); 1491 InOrder inOrder = inOrder( 1492 retriableStreamRecorder, 1493 mockStream1, mockStream2, mockStream3); 1494 1495 // start 1496 doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0); 1497 retriableStream.start(masterListener); 1498 1499 inOrder.verify(retriableStreamRecorder).newSubstream(0); 1500 ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 = 1501 ArgumentCaptor.forClass(ClientStreamListener.class); 1502 inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); 1503 inOrder.verifyNoMoreInteractions(); 1504 1505 // normal retry 1506 doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1); 1507 sublistenerCaptor1.getValue() 1508 .closed(Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); 1509 1510 assertEquals(1, fakeClock.numPendingTasks()); 1511 fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS); 1512 inOrder.verify(retriableStreamRecorder).newSubstream(1); 1513 ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 = 1514 ArgumentCaptor.forClass(ClientStreamListener.class); 1515 inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); 1516 inOrder.verifyNoMoreInteractions(); 1517 verify(retriableStreamRecorder, never()).postCommit(); 1518 assertEquals(0, fakeClock.numPendingTasks()); 1519 1520 // no more transparent retry 1521 doReturn(mockStream3).when(retriableStreamRecorder).newSubstream(2); 1522 sublistenerCaptor2.getValue() 1523 .closed(Status.fromCode(NON_RETRIABLE_STATUS_CODE), REFUSED, new Metadata()); 1524 1525 verify(retriableStreamRecorder).postCommit(); 1526 } 1527 1528 @Test droppedShouldNeverRetry()1529 public void droppedShouldNeverRetry() { 1530 ClientStream mockStream1 = mock(ClientStream.class); 1531 ClientStream mockStream2 = mock(ClientStream.class); 1532 doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0); 1533 doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1); 1534 1535 // start 1536 retriableStream.start(masterListener); 1537 1538 verify(retriableStreamRecorder).newSubstream(0); 1539 ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 = 1540 ArgumentCaptor.forClass(ClientStreamListener.class); 1541 verify(mockStream1).start(sublistenerCaptor1.capture()); 1542 1543 // drop and verify no retry 1544 Status status = Status.fromCode(RETRIABLE_STATUS_CODE_1); 1545 sublistenerCaptor1.getValue().closed(status, DROPPED, new Metadata()); 1546 1547 verifyNoMoreInteractions(mockStream1, mockStream2); 1548 verify(retriableStreamRecorder).postCommit(); 1549 verify(masterListener).closed(same(status), any(Metadata.class)); 1550 } 1551 1552 /** 1553 * Used to stub a retriable stream as well as to record methods of the retriable stream being 1554 * called. 1555 */ 1556 private interface RetriableStreamRecorder { postCommit()1557 void postCommit(); 1558 newSubstream(int previousAttempts)1559 ClientStream newSubstream(int previousAttempts); 1560 prestart()1561 Status prestart(); 1562 } 1563 } 1564