1 /* 2 * Copyright 2017 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.internal.testing; 18 19 import io.grpc.ClientStreamTracer; 20 import io.grpc.Status; 21 import java.util.concurrent.CountDownLatch; 22 import java.util.concurrent.TimeUnit; 23 import java.util.concurrent.atomic.AtomicReference; 24 25 /** 26 * A {@link ClientStreamTracer} suitable for testing. 27 */ 28 public class TestClientStreamTracer extends ClientStreamTracer implements TestStreamTracer { 29 private final TestBaseStreamTracer delegate = new TestBaseStreamTracer(); 30 protected final CountDownLatch outboundHeadersLatch = new CountDownLatch(1); 31 protected final AtomicReference<Throwable> outboundHeadersCalled = 32 new AtomicReference<Throwable>(); 33 protected final AtomicReference<Throwable> inboundHeadersCalled = 34 new AtomicReference<Throwable>(); 35 36 @Override await()37 public void await() throws InterruptedException { 38 delegate.await(); 39 } 40 41 @Override await(long timeout, TimeUnit timeUnit)42 public boolean await(long timeout, TimeUnit timeUnit) throws InterruptedException { 43 return delegate.await(timeout, timeUnit); 44 } 45 46 /** 47 * Returns if {@link ClientStreamTracer#inboundHeaders} has been called. 48 */ getInboundHeaders()49 public boolean getInboundHeaders() { 50 return inboundHeadersCalled.get() != null; 51 } 52 53 /** 54 * Returns if {@link ClientStreamTracer#outboundHeaders} has been called. 55 */ getOutboundHeaders()56 public boolean getOutboundHeaders() { 57 return outboundHeadersCalled.get() != null; 58 } 59 60 /** 61 * Allow tests to await the outbound header event, which depending on the test case may be 62 * necessary (e.g., if we test for a Netty client's outbound headers upon receiving the start of 63 * stream on the server side, the tracer won't know that headers were sent until a channel future 64 * executes). 65 */ awaitOutboundHeaders(int timeout, TimeUnit unit)66 public boolean awaitOutboundHeaders(int timeout, TimeUnit unit) throws Exception { 67 return outboundHeadersLatch.await(timeout, unit); 68 } 69 70 @Override getStatus()71 public Status getStatus() { 72 return delegate.getStatus(); 73 } 74 75 @Override getInboundWireSize()76 public long getInboundWireSize() { 77 return delegate.getInboundWireSize(); 78 } 79 80 @Override getInboundUncompressedSize()81 public long getInboundUncompressedSize() { 82 return delegate.getInboundUncompressedSize(); 83 } 84 85 @Override getOutboundWireSize()86 public long getOutboundWireSize() { 87 return delegate.getOutboundWireSize(); 88 } 89 90 @Override getOutboundUncompressedSize()91 public long getOutboundUncompressedSize() { 92 return delegate.getOutboundUncompressedSize(); 93 } 94 95 @Override setFailDuplicateCallbacks(boolean fail)96 public void setFailDuplicateCallbacks(boolean fail) { 97 delegate.setFailDuplicateCallbacks(fail); 98 } 99 100 @Override nextOutboundEvent()101 public String nextOutboundEvent() { 102 return delegate.nextOutboundEvent(); 103 } 104 105 @Override nextInboundEvent()106 public String nextInboundEvent() { 107 return delegate.nextInboundEvent(); 108 } 109 110 @Override outboundWireSize(long bytes)111 public void outboundWireSize(long bytes) { 112 delegate.outboundWireSize(bytes); 113 } 114 115 @Override inboundWireSize(long bytes)116 public void inboundWireSize(long bytes) { 117 delegate.inboundWireSize(bytes); 118 } 119 120 @Override outboundUncompressedSize(long bytes)121 public void outboundUncompressedSize(long bytes) { 122 delegate.outboundUncompressedSize(bytes); 123 } 124 125 @Override inboundUncompressedSize(long bytes)126 public void inboundUncompressedSize(long bytes) { 127 delegate.inboundUncompressedSize(bytes); 128 } 129 130 @Override streamClosed(Status status)131 public void streamClosed(Status status) { 132 delegate.streamClosed(status); 133 } 134 135 @Override inboundMessage(int seqNo)136 public void inboundMessage(int seqNo) { 137 delegate.inboundMessage(seqNo); 138 } 139 140 @Override outboundMessage(int seqNo)141 public void outboundMessage(int seqNo) { 142 delegate.outboundMessage(seqNo); 143 } 144 145 @Override outboundMessageSent(int seqNo, long optionalWireSize, long optionalUncompressedSize)146 public void outboundMessageSent(int seqNo, long optionalWireSize, long optionalUncompressedSize) { 147 delegate.outboundMessageSent(seqNo, optionalWireSize, optionalUncompressedSize); 148 } 149 150 @Override inboundMessageRead(int seqNo, long optionalWireSize, long optionalUncompressedSize)151 public void inboundMessageRead(int seqNo, long optionalWireSize, long optionalUncompressedSize) { 152 delegate.inboundMessageRead(seqNo, optionalWireSize, optionalUncompressedSize); 153 } 154 155 @Override outboundHeaders()156 public void outboundHeaders() { 157 if (!outboundHeadersCalled.compareAndSet(null, new Exception("first stack")) 158 && delegate.failDuplicateCallbacks.get()) { 159 throw new AssertionError( 160 "outboundHeaders called more than once", 161 new Exception("second stack", outboundHeadersCalled.get())); 162 } 163 outboundHeadersLatch.countDown(); 164 } 165 166 @Override inboundHeaders()167 public void inboundHeaders() { 168 if (!inboundHeadersCalled.compareAndSet(null, new Exception("first stack")) 169 && delegate.failDuplicateCallbacks.get()) { 170 throw new AssertionError( 171 "inboundHeaders called more than once", 172 new Exception("second stack", inboundHeadersCalled.get())); 173 } 174 } 175 } 176