1 /* 2 * Copyright 2017 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.internal; 18 19 import static com.google.common.base.Preconditions.checkNotNull; 20 import static com.google.common.base.Preconditions.checkState; 21 22 import com.google.common.annotations.VisibleForTesting; 23 import com.google.common.base.Objects; 24 import io.grpc.Attributes; 25 import io.grpc.CallOptions; 26 import io.grpc.ClientStreamTracer; 27 import io.grpc.Compressor; 28 import io.grpc.Deadline; 29 import io.grpc.DecompressorRegistry; 30 import io.grpc.Metadata; 31 import io.grpc.MethodDescriptor; 32 import io.grpc.Status; 33 import java.io.InputStream; 34 import java.util.ArrayList; 35 import java.util.Collection; 36 import java.util.Collections; 37 import java.util.List; 38 import java.util.Random; 39 import java.util.concurrent.Executor; 40 import java.util.concurrent.Future; 41 import java.util.concurrent.ScheduledExecutorService; 42 import java.util.concurrent.TimeUnit; 43 import java.util.concurrent.atomic.AtomicInteger; 44 import java.util.concurrent.atomic.AtomicLong; 45 import javax.annotation.CheckReturnValue; 46 import javax.annotation.Nullable; 47 import javax.annotation.concurrent.GuardedBy; 48 49 /** A logical {@link ClientStream} that is retriable. */ 50 abstract class RetriableStream<ReqT> implements ClientStream { 51 @VisibleForTesting 52 static final Metadata.Key<String> GRPC_PREVIOUS_RPC_ATTEMPTS = 53 Metadata.Key.of("grpc-previous-rpc-attempts", Metadata.ASCII_STRING_MARSHALLER); 54 55 @VisibleForTesting 56 static final Metadata.Key<String> GRPC_RETRY_PUSHBACK_MS = 57 Metadata.Key.of("grpc-retry-pushback-ms", Metadata.ASCII_STRING_MARSHALLER); 58 59 private static final Status CANCELLED_BECAUSE_COMMITTED = 60 Status.CANCELLED.withDescription("Stream thrown away because RetriableStream committed"); 61 62 private final MethodDescriptor<ReqT, ?> method; 63 private final Executor callExecutor; 64 private final ScheduledExecutorService scheduledExecutorService; 65 // Must not modify it. 66 private final Metadata headers; 67 private final RetryPolicy.Provider retryPolicyProvider; 68 private final HedgingPolicy.Provider hedgingPolicyProvider; 69 private RetryPolicy retryPolicy; 70 71 /** Must be held when updating state, accessing state.buffer, or certain substream attributes. */ 72 private final Object lock = new Object(); 73 74 private final ChannelBufferMeter channelBufferUsed; 75 private final long perRpcBufferLimit; 76 private final long channelBufferLimit; 77 @Nullable 78 private final Throttle throttle; 79 80 private volatile State state = new State( 81 new ArrayList<BufferEntry>(8), Collections.<Substream>emptyList(), null, false, false); 82 83 /** 84 * Either transparent retry happened or reached server's application logic. 85 */ 86 private boolean noMoreTransparentRetry; 87 88 // Used for recording the share of buffer used for the current call out of the channel buffer. 89 // This field would not be necessary if there is no channel buffer limit. 90 @GuardedBy("lock") 91 private long perRpcBufferUsed; 92 93 private ClientStreamListener masterListener; 94 private Future<?> scheduledRetry; 95 private long nextBackoffIntervalNanos; 96 RetriableStream( MethodDescriptor<ReqT, ?> method, Metadata headers, ChannelBufferMeter channelBufferUsed, long perRpcBufferLimit, long channelBufferLimit, Executor callExecutor, ScheduledExecutorService scheduledExecutorService, RetryPolicy.Provider retryPolicyProvider, HedgingPolicy.Provider hedgingPolicyProvider, @Nullable Throttle throttle)97 RetriableStream( 98 MethodDescriptor<ReqT, ?> method, Metadata headers, 99 ChannelBufferMeter channelBufferUsed, long perRpcBufferLimit, long channelBufferLimit, 100 Executor callExecutor, ScheduledExecutorService scheduledExecutorService, 101 RetryPolicy.Provider retryPolicyProvider, HedgingPolicy.Provider hedgingPolicyProvider, 102 @Nullable Throttle throttle) { 103 this.method = method; 104 this.channelBufferUsed = channelBufferUsed; 105 this.perRpcBufferLimit = perRpcBufferLimit; 106 this.channelBufferLimit = channelBufferLimit; 107 this.callExecutor = callExecutor; 108 this.scheduledExecutorService = scheduledExecutorService; 109 this.headers = headers; 110 this.retryPolicyProvider = checkNotNull(retryPolicyProvider, "retryPolicyProvider"); 111 this.hedgingPolicyProvider = checkNotNull(hedgingPolicyProvider, "hedgingPolicyProvider"); 112 this.throttle = throttle; 113 } 114 115 @Nullable // null if already committed 116 @CheckReturnValue commit(final Substream winningSubstream)117 private Runnable commit(final Substream winningSubstream) { 118 synchronized (lock) { 119 if (state.winningSubstream != null) { 120 return null; 121 } 122 final Collection<Substream> savedDrainedSubstreams = state.drainedSubstreams; 123 124 state = state.committed(winningSubstream); 125 126 // subtract the share of this RPC from channelBufferUsed. 127 channelBufferUsed.addAndGet(-perRpcBufferUsed); 128 129 class CommitTask implements Runnable { 130 @Override 131 public void run() { 132 // For hedging only, not needed for normal retry 133 // TODO(zdapeng): also cancel all the scheduled hedges. 134 for (Substream substream : savedDrainedSubstreams) { 135 if (substream != winningSubstream) { 136 substream.stream.cancel(CANCELLED_BECAUSE_COMMITTED); 137 } 138 } 139 140 postCommit(); 141 } 142 } 143 144 return new CommitTask(); 145 } 146 } 147 postCommit()148 abstract void postCommit(); 149 150 /** 151 * Calls commit() and if successful runs the post commit task. 152 */ commitAndRun(Substream winningSubstream)153 private void commitAndRun(Substream winningSubstream) { 154 Runnable postCommitTask = commit(winningSubstream); 155 156 if (postCommitTask != null) { 157 postCommitTask.run(); 158 } 159 } 160 161 createSubstream(int previousAttempts)162 private Substream createSubstream(int previousAttempts) { 163 Substream sub = new Substream(previousAttempts); 164 // one tracer per substream 165 final ClientStreamTracer bufferSizeTracer = new BufferSizeTracer(sub); 166 ClientStreamTracer.Factory tracerFactory = new ClientStreamTracer.Factory() { 167 @Override 168 public ClientStreamTracer newClientStreamTracer(CallOptions callOptions, Metadata headers) { 169 return bufferSizeTracer; 170 } 171 }; 172 173 Metadata newHeaders = updateHeaders(headers, previousAttempts); 174 // NOTICE: This set _must_ be done before stream.start() and it actually is. 175 sub.stream = newSubstream(tracerFactory, newHeaders); 176 return sub; 177 } 178 179 /** 180 * Creates a new physical ClientStream that represents a retry/hedging attempt. The returned 181 * Client stream is not yet started. 182 */ newSubstream( ClientStreamTracer.Factory tracerFactory, Metadata headers)183 abstract ClientStream newSubstream( 184 ClientStreamTracer.Factory tracerFactory, Metadata headers); 185 186 /** Adds grpc-previous-rpc-attempts in the headers of a retry/hedging RPC. */ 187 @VisibleForTesting updateHeaders( Metadata originalHeaders, int previousAttempts)188 final Metadata updateHeaders( 189 Metadata originalHeaders, int previousAttempts) { 190 Metadata newHeaders = new Metadata(); 191 newHeaders.merge(originalHeaders); 192 if (previousAttempts > 0) { 193 newHeaders.put(GRPC_PREVIOUS_RPC_ATTEMPTS, String.valueOf(previousAttempts)); 194 } 195 return newHeaders; 196 } 197 drain(Substream substream)198 private void drain(Substream substream) { 199 int index = 0; 200 int chunk = 0x80; 201 List<BufferEntry> list = null; 202 203 while (true) { 204 State savedState; 205 206 synchronized (lock) { 207 savedState = state; 208 if (savedState.winningSubstream != null && savedState.winningSubstream != substream) { 209 // committed but not me 210 break; 211 } 212 if (index == savedState.buffer.size()) { // I'm drained 213 state = savedState.substreamDrained(substream); 214 return; 215 } 216 217 if (substream.closed) { 218 return; 219 } 220 221 int stop = Math.min(index + chunk, savedState.buffer.size()); 222 if (list == null) { 223 list = new ArrayList<>(savedState.buffer.subList(index, stop)); 224 } else { 225 list.clear(); 226 list.addAll(savedState.buffer.subList(index, stop)); 227 } 228 index = stop; 229 } 230 231 for (BufferEntry bufferEntry : list) { 232 savedState = state; 233 if (savedState.winningSubstream != null && savedState.winningSubstream != substream) { 234 // committed but not me 235 break; 236 } 237 if (savedState.cancelled) { 238 checkState( 239 savedState.winningSubstream == substream, 240 "substream should be CANCELLED_BECAUSE_COMMITTED already"); 241 return; 242 } 243 bufferEntry.runWith(substream); 244 } 245 } 246 247 substream.stream.cancel(CANCELLED_BECAUSE_COMMITTED); 248 } 249 250 /** 251 * Runs pre-start tasks. Returns the Status of shutdown if the channel is shutdown. 252 */ 253 @CheckReturnValue 254 @Nullable prestart()255 abstract Status prestart(); 256 257 /** Starts the first PRC attempt. */ 258 @Override start(ClientStreamListener listener)259 public final void start(ClientStreamListener listener) { 260 masterListener = listener; 261 262 Status shutdownStatus = prestart(); 263 264 if (shutdownStatus != null) { 265 cancel(shutdownStatus); 266 return; 267 } 268 269 class StartEntry implements BufferEntry { 270 @Override 271 public void runWith(Substream substream) { 272 substream.stream.start(new Sublistener(substream)); 273 } 274 } 275 276 synchronized (lock) { 277 state.buffer.add(new StartEntry()); 278 } 279 280 Substream substream = createSubstream(0); 281 drain(substream); 282 283 // TODO(zdapeng): schedule hedging if needed 284 } 285 286 @Override cancel(Status reason)287 public final void cancel(Status reason) { 288 Substream noopSubstream = new Substream(0 /* previousAttempts doesn't matter here */); 289 noopSubstream.stream = new NoopClientStream(); 290 Runnable runnable = commit(noopSubstream); 291 292 if (runnable != null) { 293 Future<?> savedScheduledRetry = scheduledRetry; 294 if (savedScheduledRetry != null) { 295 savedScheduledRetry.cancel(false); 296 scheduledRetry = null; 297 } 298 masterListener.closed(reason, new Metadata()); 299 runnable.run(); 300 return; 301 } 302 303 state.winningSubstream.stream.cancel(reason); 304 synchronized (lock) { 305 // This is not required, but causes a short-circuit in the draining process. 306 state = state.cancelled(); 307 } 308 } 309 delayOrExecute(BufferEntry bufferEntry)310 private void delayOrExecute(BufferEntry bufferEntry) { 311 Collection<Substream> savedDrainedSubstreams; 312 synchronized (lock) { 313 if (!state.passThrough) { 314 state.buffer.add(bufferEntry); 315 } 316 savedDrainedSubstreams = state.drainedSubstreams; 317 } 318 319 for (Substream substream : savedDrainedSubstreams) { 320 bufferEntry.runWith(substream); 321 } 322 } 323 324 /** 325 * Do not use it directly. Use {@link #sendMessage(ReqT)} instead because we don't use InputStream 326 * for buffering. 327 */ 328 @Override writeMessage(InputStream message)329 public final void writeMessage(InputStream message) { 330 throw new IllegalStateException("RetriableStream.writeMessage() should not be called directly"); 331 } 332 sendMessage(final ReqT message)333 final void sendMessage(final ReqT message) { 334 State savedState = state; 335 if (savedState.passThrough) { 336 savedState.winningSubstream.stream.writeMessage(method.streamRequest(message)); 337 return; 338 } 339 340 class SendMessageEntry implements BufferEntry { 341 @Override 342 public void runWith(Substream substream) { 343 substream.stream.writeMessage(method.streamRequest(message)); 344 } 345 } 346 347 delayOrExecute(new SendMessageEntry()); 348 } 349 350 @Override request(final int numMessages)351 public final void request(final int numMessages) { 352 State savedState = state; 353 if (savedState.passThrough) { 354 savedState.winningSubstream.stream.request(numMessages); 355 return; 356 } 357 358 class RequestEntry implements BufferEntry { 359 @Override 360 public void runWith(Substream substream) { 361 substream.stream.request(numMessages); 362 } 363 } 364 365 delayOrExecute(new RequestEntry()); 366 } 367 368 @Override flush()369 public final void flush() { 370 State savedState = state; 371 if (savedState.passThrough) { 372 savedState.winningSubstream.stream.flush(); 373 return; 374 } 375 376 class FlushEntry implements BufferEntry { 377 @Override 378 public void runWith(Substream substream) { 379 substream.stream.flush(); 380 } 381 } 382 383 delayOrExecute(new FlushEntry()); 384 } 385 386 @Override isReady()387 public final boolean isReady() { 388 for (Substream substream : state.drainedSubstreams) { 389 if (substream.stream.isReady()) { 390 return true; 391 } 392 } 393 return false; 394 } 395 396 @Override setCompressor(final Compressor compressor)397 public final void setCompressor(final Compressor compressor) { 398 class CompressorEntry implements BufferEntry { 399 @Override 400 public void runWith(Substream substream) { 401 substream.stream.setCompressor(compressor); 402 } 403 } 404 405 delayOrExecute(new CompressorEntry()); 406 } 407 408 @Override setFullStreamDecompression(final boolean fullStreamDecompression)409 public final void setFullStreamDecompression(final boolean fullStreamDecompression) { 410 class FullStreamDecompressionEntry implements BufferEntry { 411 @Override 412 public void runWith(Substream substream) { 413 substream.stream.setFullStreamDecompression(fullStreamDecompression); 414 } 415 } 416 417 delayOrExecute(new FullStreamDecompressionEntry()); 418 } 419 420 @Override setMessageCompression(final boolean enable)421 public final void setMessageCompression(final boolean enable) { 422 class MessageCompressionEntry implements BufferEntry { 423 @Override 424 public void runWith(Substream substream) { 425 substream.stream.setMessageCompression(enable); 426 } 427 } 428 429 delayOrExecute(new MessageCompressionEntry()); 430 } 431 432 @Override halfClose()433 public final void halfClose() { 434 class HalfCloseEntry implements BufferEntry { 435 @Override 436 public void runWith(Substream substream) { 437 substream.stream.halfClose(); 438 } 439 } 440 441 delayOrExecute(new HalfCloseEntry()); 442 } 443 444 @Override setAuthority(final String authority)445 public final void setAuthority(final String authority) { 446 class AuthorityEntry implements BufferEntry { 447 @Override 448 public void runWith(Substream substream) { 449 substream.stream.setAuthority(authority); 450 } 451 } 452 453 delayOrExecute(new AuthorityEntry()); 454 } 455 456 @Override setDecompressorRegistry(final DecompressorRegistry decompressorRegistry)457 public final void setDecompressorRegistry(final DecompressorRegistry decompressorRegistry) { 458 class DecompressorRegistryEntry implements BufferEntry { 459 @Override 460 public void runWith(Substream substream) { 461 substream.stream.setDecompressorRegistry(decompressorRegistry); 462 } 463 } 464 465 delayOrExecute(new DecompressorRegistryEntry()); 466 } 467 468 @Override setMaxInboundMessageSize(final int maxSize)469 public final void setMaxInboundMessageSize(final int maxSize) { 470 class MaxInboundMessageSizeEntry implements BufferEntry { 471 @Override 472 public void runWith(Substream substream) { 473 substream.stream.setMaxInboundMessageSize(maxSize); 474 } 475 } 476 477 delayOrExecute(new MaxInboundMessageSizeEntry()); 478 } 479 480 @Override setMaxOutboundMessageSize(final int maxSize)481 public final void setMaxOutboundMessageSize(final int maxSize) { 482 class MaxOutboundMessageSizeEntry implements BufferEntry { 483 @Override 484 public void runWith(Substream substream) { 485 substream.stream.setMaxOutboundMessageSize(maxSize); 486 } 487 } 488 489 delayOrExecute(new MaxOutboundMessageSizeEntry()); 490 } 491 492 @Override setDeadline(final Deadline deadline)493 public final void setDeadline(final Deadline deadline) { 494 class DeadlineEntry implements BufferEntry { 495 @Override 496 public void runWith(Substream substream) { 497 substream.stream.setDeadline(deadline); 498 } 499 } 500 501 delayOrExecute(new DeadlineEntry()); 502 } 503 504 @Override getAttributes()505 public final Attributes getAttributes() { 506 if (state.winningSubstream != null) { 507 return state.winningSubstream.stream.getAttributes(); 508 } 509 return Attributes.EMPTY; 510 } 511 512 private static Random random = new Random(); 513 514 @VisibleForTesting setRandom(Random random)515 static void setRandom(Random random) { 516 RetriableStream.random = random; 517 } 518 hasHedging()519 boolean hasHedging() { 520 return false; 521 } 522 523 private interface BufferEntry { 524 /** Replays the buffer entry with the given stream. */ runWith(Substream substream)525 void runWith(Substream substream); 526 } 527 528 private final class Sublistener implements ClientStreamListener { 529 final Substream substream; 530 Sublistener(Substream substream)531 Sublistener(Substream substream) { 532 this.substream = substream; 533 } 534 535 @Override headersRead(Metadata headers)536 public void headersRead(Metadata headers) { 537 commitAndRun(substream); 538 if (state.winningSubstream == substream) { 539 masterListener.headersRead(headers); 540 if (throttle != null) { 541 throttle.onSuccess(); 542 } 543 } 544 } 545 546 @Override closed(Status status, Metadata trailers)547 public void closed(Status status, Metadata trailers) { 548 closed(status, RpcProgress.PROCESSED, trailers); 549 } 550 551 @Override closed(Status status, RpcProgress rpcProgress, Metadata trailers)552 public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) { 553 synchronized (lock) { 554 state = state.substreamClosed(substream); 555 } 556 557 // handle a race between buffer limit exceeded and closed, when setting 558 // substream.bufferLimitExceeded = true happens before state.substreamClosed(substream). 559 if (substream.bufferLimitExceeded) { 560 commitAndRun(substream); 561 if (state.winningSubstream == substream) { 562 masterListener.closed(status, trailers); 563 } 564 return; 565 } 566 567 if (state.winningSubstream == null) { 568 if (rpcProgress == RpcProgress.REFUSED && !noMoreTransparentRetry) { 569 // TODO(zdapeng): in hedging case noMoreTransparentRetry might need be synchronized. 570 noMoreTransparentRetry = true; 571 callExecutor.execute(new Runnable() { 572 @Override 573 public void run() { 574 // transparent retry 575 Substream newSubstream = createSubstream( 576 substream.previousAttempts); 577 drain(newSubstream); 578 } 579 }); 580 return; 581 } else if (rpcProgress == RpcProgress.DROPPED) { 582 // For normal retry, nothing need be done here, will just commit. 583 // For hedging: 584 // TODO(zdapeng): cancel all scheduled hedges (TBD) 585 } else { 586 noMoreTransparentRetry = true; 587 588 if (retryPolicy == null) { 589 retryPolicy = retryPolicyProvider.get(); 590 nextBackoffIntervalNanos = retryPolicy.initialBackoffNanos; 591 } 592 593 RetryPlan retryPlan = makeRetryDecision(retryPolicy, status, trailers); 594 if (retryPlan.shouldRetry) { 595 // The check state.winningSubstream == null, checking if is not already committed, is 596 // racy, but is still safe b/c the retry will also handle committed/cancellation 597 scheduledRetry = scheduledExecutorService.schedule( 598 new Runnable() { 599 @Override 600 public void run() { 601 scheduledRetry = null; 602 callExecutor.execute(new Runnable() { 603 @Override 604 public void run() { 605 // retry 606 Substream newSubstream = createSubstream(substream.previousAttempts + 1); 607 drain(newSubstream); 608 } 609 }); 610 } 611 }, 612 retryPlan.backoffNanos, 613 TimeUnit.NANOSECONDS); 614 return; 615 } 616 } 617 } 618 619 if (!hasHedging()) { 620 commitAndRun(substream); 621 if (state.winningSubstream == substream) { 622 masterListener.closed(status, trailers); 623 } 624 } 625 626 // TODO(zdapeng): in hedge case, if this is a fatal status, cancel all the other attempts, and 627 // close the masterListener. 628 } 629 630 /** 631 * Decides in current situation whether or not the RPC should retry and if it should retry how 632 * long the backoff should be. The decision does not take the commitment status into account, so 633 * caller should check it separately. 634 */ 635 // TODO(zdapeng): add HedgingPolicy as param makeRetryDecision(RetryPolicy retryPolicy, Status status, Metadata trailer)636 private RetryPlan makeRetryDecision(RetryPolicy retryPolicy, Status status, Metadata trailer) { 637 boolean shouldRetry = false; 638 long backoffNanos = 0L; 639 boolean isRetryableStatusCode = retryPolicy.retryableStatusCodes.contains(status.getCode()); 640 641 String pushbackStr = trailer.get(GRPC_RETRY_PUSHBACK_MS); 642 Integer pushbackMillis = null; 643 if (pushbackStr != null) { 644 try { 645 pushbackMillis = Integer.valueOf(pushbackStr); 646 } catch (NumberFormatException e) { 647 pushbackMillis = -1; 648 } 649 } 650 651 boolean isThrottled = false; 652 if (throttle != null) { 653 if (isRetryableStatusCode || (pushbackMillis != null && pushbackMillis < 0)) { 654 isThrottled = !throttle.onQualifiedFailureThenCheckIsAboveThreshold(); 655 } 656 } 657 658 if (retryPolicy.maxAttempts > substream.previousAttempts + 1 && !isThrottled) { 659 if (pushbackMillis == null) { 660 if (isRetryableStatusCode) { 661 shouldRetry = true; 662 backoffNanos = (long) (nextBackoffIntervalNanos * random.nextDouble()); 663 nextBackoffIntervalNanos = Math.min( 664 (long) (nextBackoffIntervalNanos * retryPolicy.backoffMultiplier), 665 retryPolicy.maxBackoffNanos); 666 667 } // else no retry 668 } else if (pushbackMillis >= 0) { 669 shouldRetry = true; 670 backoffNanos = TimeUnit.MILLISECONDS.toNanos(pushbackMillis); 671 nextBackoffIntervalNanos = retryPolicy.initialBackoffNanos; 672 } // else no retry 673 } // else no retry 674 675 // TODO(zdapeng): transparent retry 676 // TODO(zdapeng): hedging 677 return new RetryPlan(shouldRetry, backoffNanos); 678 } 679 680 @Override messagesAvailable(MessageProducer producer)681 public void messagesAvailable(MessageProducer producer) { 682 State savedState = state; 683 checkState( 684 savedState.winningSubstream != null, "Headers should be received prior to messages."); 685 if (savedState.winningSubstream != substream) { 686 return; 687 } 688 masterListener.messagesAvailable(producer); 689 } 690 691 @Override onReady()692 public void onReady() { 693 // TODO(zdapeng): the more correct way to handle onReady 694 if (state.drainedSubstreams.contains(substream)) { 695 masterListener.onReady(); 696 } 697 } 698 } 699 700 private static final class State { 701 /** Committed and the winning substream drained. */ 702 final boolean passThrough; 703 704 /** A list of buffered ClientStream runnables. Set to Null once passThrough. */ 705 @Nullable final List<BufferEntry> buffer; 706 707 /** 708 * Unmodifiable collection of all the substreams that are drained. Exceptional cases: Singleton 709 * once passThrough; Empty if committed but not passTrough. 710 */ 711 final Collection<Substream> drainedSubstreams; 712 713 /** Null until committed. */ 714 @Nullable final Substream winningSubstream; 715 716 /** Not required to set to true when cancelled, but can short-circuit the draining process. */ 717 final boolean cancelled; 718 State( @ullable List<BufferEntry> buffer, Collection<Substream> drainedSubstreams, @Nullable Substream winningSubstream, boolean cancelled, boolean passThrough)719 State( 720 @Nullable List<BufferEntry> buffer, 721 Collection<Substream> drainedSubstreams, 722 @Nullable Substream winningSubstream, 723 boolean cancelled, 724 boolean passThrough) { 725 this.buffer = buffer; 726 this.drainedSubstreams = 727 checkNotNull(drainedSubstreams, "drainedSubstreams"); 728 this.winningSubstream = winningSubstream; 729 this.cancelled = cancelled; 730 this.passThrough = passThrough; 731 732 checkState(!passThrough || buffer == null, "passThrough should imply buffer is null"); 733 checkState( 734 !passThrough || winningSubstream != null, 735 "passThrough should imply winningSubstream != null"); 736 checkState( 737 !passThrough 738 || (drainedSubstreams.size() == 1 && drainedSubstreams.contains(winningSubstream)) 739 || (drainedSubstreams.size() == 0 && winningSubstream.closed), 740 "passThrough should imply winningSubstream is drained"); 741 checkState(!cancelled || winningSubstream != null, "cancelled should imply committed"); 742 } 743 744 @CheckReturnValue 745 // GuardedBy RetriableStream.lock cancelled()746 State cancelled() { 747 return new State(buffer, drainedSubstreams, winningSubstream, true, passThrough); 748 } 749 750 /** The given substream is drained. */ 751 @CheckReturnValue 752 // GuardedBy RetriableStream.lock substreamDrained(Substream substream)753 State substreamDrained(Substream substream) { 754 checkState(!passThrough, "Already passThrough"); 755 756 Collection<Substream> drainedSubstreams; 757 758 if (substream.closed) { 759 drainedSubstreams = this.drainedSubstreams; 760 } else if (this.drainedSubstreams.isEmpty()) { 761 // optimize for 0-retry, which is most of the cases. 762 drainedSubstreams = Collections.singletonList(substream); 763 } else { 764 drainedSubstreams = new ArrayList<>(this.drainedSubstreams); 765 drainedSubstreams.add(substream); 766 drainedSubstreams = Collections.unmodifiableCollection(drainedSubstreams); 767 } 768 769 boolean passThrough = winningSubstream != null; 770 771 List<BufferEntry> buffer = this.buffer; 772 if (passThrough) { 773 checkState( 774 winningSubstream == substream, "Another RPC attempt has already committed"); 775 buffer = null; 776 } 777 778 return new State(buffer, drainedSubstreams, winningSubstream, cancelled, passThrough); 779 } 780 781 /** The given substream is closed. */ 782 @CheckReturnValue 783 // GuardedBy RetriableStream.lock substreamClosed(Substream substream)784 State substreamClosed(Substream substream) { 785 substream.closed = true; 786 if (this.drainedSubstreams.contains(substream)) { 787 Collection<Substream> drainedSubstreams = new ArrayList<>(this.drainedSubstreams); 788 drainedSubstreams.remove(substream); 789 drainedSubstreams = Collections.unmodifiableCollection(drainedSubstreams); 790 return new State(buffer, drainedSubstreams, winningSubstream, cancelled, passThrough); 791 } else { 792 return this; 793 } 794 } 795 796 @CheckReturnValue 797 // GuardedBy RetriableStream.lock committed(Substream winningSubstream)798 State committed(Substream winningSubstream) { 799 checkState(this.winningSubstream == null, "Already committed"); 800 801 boolean passThrough = false; 802 List<BufferEntry> buffer = this.buffer; 803 Collection<Substream> drainedSubstreams; 804 805 if (this.drainedSubstreams.contains(winningSubstream)) { 806 passThrough = true; 807 buffer = null; 808 drainedSubstreams = Collections.singleton(winningSubstream); 809 } else { 810 drainedSubstreams = Collections.emptyList(); 811 } 812 813 return new State(buffer, drainedSubstreams, winningSubstream, cancelled, passThrough); 814 } 815 } 816 817 /** 818 * A wrapper of a physical stream of a retry/hedging attempt, that comes with some useful 819 * attributes. 820 */ 821 private static final class Substream { 822 ClientStream stream; 823 824 // GuardedBy RetriableStream.lock 825 boolean closed; 826 827 // setting to true must be GuardedBy RetriableStream.lock 828 boolean bufferLimitExceeded; 829 830 final int previousAttempts; 831 Substream(int previousAttempts)832 Substream(int previousAttempts) { 833 this.previousAttempts = previousAttempts; 834 } 835 } 836 837 838 /** 839 * Traces the buffer used by a substream. 840 */ 841 class BufferSizeTracer extends ClientStreamTracer { 842 // Each buffer size tracer is dedicated to one specific substream. 843 private final Substream substream; 844 845 @GuardedBy("lock") 846 long bufferNeeded; 847 BufferSizeTracer(Substream substream)848 BufferSizeTracer(Substream substream) { 849 this.substream = substream; 850 } 851 852 /** 853 * A message is sent to the wire, so its reference would be released if no retry or 854 * hedging were involved. So at this point we have to hold the reference of the message longer 855 * for retry, and we need to increment {@code substream.bufferNeeded}. 856 */ 857 @Override outboundWireSize(long bytes)858 public void outboundWireSize(long bytes) { 859 if (state.winningSubstream != null) { 860 return; 861 } 862 863 Runnable postCommitTask = null; 864 865 // TODO(zdapeng): avoid using the same lock for both in-bound and out-bound. 866 synchronized (lock) { 867 if (state.winningSubstream != null || substream.closed) { 868 return; 869 } 870 bufferNeeded += bytes; 871 if (bufferNeeded <= perRpcBufferUsed) { 872 return; 873 } 874 875 if (bufferNeeded > perRpcBufferLimit) { 876 substream.bufferLimitExceeded = true; 877 } else { 878 // Only update channelBufferUsed when perRpcBufferUsed is not exceeding perRpcBufferLimit. 879 long savedChannelBufferUsed = 880 channelBufferUsed.addAndGet(bufferNeeded - perRpcBufferUsed); 881 perRpcBufferUsed = bufferNeeded; 882 883 if (savedChannelBufferUsed > channelBufferLimit) { 884 substream.bufferLimitExceeded = true; 885 } 886 } 887 888 if (substream.bufferLimitExceeded) { 889 postCommitTask = commit(substream); 890 } 891 } 892 893 if (postCommitTask != null) { 894 postCommitTask.run(); 895 } 896 } 897 } 898 899 /** 900 * Used to keep track of the total amount of memory used to buffer retryable or hedged RPCs for 901 * the Channel. There should be a single instance of it for each channel. 902 */ 903 static final class ChannelBufferMeter { 904 private final AtomicLong bufferUsed = new AtomicLong(); 905 906 @VisibleForTesting addAndGet(long newBytesUsed)907 long addAndGet(long newBytesUsed) { 908 return bufferUsed.addAndGet(newBytesUsed); 909 } 910 } 911 912 /** 913 * Used for retry throttling. 914 */ 915 static final class Throttle { 916 917 private static final int THREE_DECIMAL_PLACES_SCALE_UP = 1000; 918 919 /** 920 * 1000 times the maxTokens field of the retryThrottling policy in service config. 921 * The number of tokens starts at maxTokens. The token_count will always be between 0 and 922 * maxTokens. 923 */ 924 final int maxTokens; 925 926 /** 927 * Half of {@code maxTokens}. 928 */ 929 final int threshold; 930 931 /** 932 * 1000 times the tokenRatio field of the retryThrottling policy in service config. 933 */ 934 final int tokenRatio; 935 936 final AtomicInteger tokenCount = new AtomicInteger(); 937 Throttle(float maxTokens, float tokenRatio)938 Throttle(float maxTokens, float tokenRatio) { 939 // tokenRatio is up to 3 decimal places 940 this.tokenRatio = (int) (tokenRatio * THREE_DECIMAL_PLACES_SCALE_UP); 941 this.maxTokens = (int) (maxTokens * THREE_DECIMAL_PLACES_SCALE_UP); 942 this.threshold = this.maxTokens / 2; 943 tokenCount.set(this.maxTokens); 944 } 945 946 @VisibleForTesting isAboveThreshold()947 boolean isAboveThreshold() { 948 return tokenCount.get() > threshold; 949 } 950 951 /** 952 * Counts down the token on qualified failure and checks if it is above the threshold 953 * atomically. Qualified failure is a failure with a retryable or non-fatal status code or with 954 * a not-to-retry pushback. 955 */ 956 @VisibleForTesting onQualifiedFailureThenCheckIsAboveThreshold()957 boolean onQualifiedFailureThenCheckIsAboveThreshold() { 958 while (true) { 959 int currentCount = tokenCount.get(); 960 if (currentCount == 0) { 961 return false; 962 } 963 int decremented = currentCount - (1 * THREE_DECIMAL_PLACES_SCALE_UP); 964 boolean updated = tokenCount.compareAndSet(currentCount, Math.max(decremented, 0)); 965 if (updated) { 966 return decremented > threshold; 967 } 968 } 969 } 970 971 @VisibleForTesting onSuccess()972 void onSuccess() { 973 while (true) { 974 int currentCount = tokenCount.get(); 975 if (currentCount == maxTokens) { 976 break; 977 } 978 int incremented = currentCount + tokenRatio; 979 boolean updated = tokenCount.compareAndSet(currentCount, Math.min(incremented, maxTokens)); 980 if (updated) { 981 break; 982 } 983 } 984 } 985 986 @Override equals(Object o)987 public boolean equals(Object o) { 988 if (this == o) { 989 return true; 990 } 991 if (!(o instanceof Throttle)) { 992 return false; 993 } 994 Throttle that = (Throttle) o; 995 return maxTokens == that.maxTokens && tokenRatio == that.tokenRatio; 996 } 997 998 @Override hashCode()999 public int hashCode() { 1000 return Objects.hashCode(maxTokens, tokenRatio); 1001 } 1002 } 1003 1004 private static final class RetryPlan { 1005 final boolean shouldRetry; 1006 // TODO(zdapeng) boolean hasHedging 1007 final long backoffNanos; 1008 RetryPlan(boolean shouldRetry, long backoffNanos)1009 RetryPlan(boolean shouldRetry, long backoffNanos) { 1010 this.shouldRetry = shouldRetry; 1011 this.backoffNanos = backoffNanos; 1012 } 1013 } 1014 } 1015