1 /*
2  * Copyright 2017 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.internal.testing;
18 
19 import io.grpc.ClientStreamTracer;
20 import io.grpc.Status;
21 import java.util.concurrent.CountDownLatch;
22 import java.util.concurrent.TimeUnit;
23 import java.util.concurrent.atomic.AtomicReference;
24 
25 /**
26  * A {@link ClientStreamTracer} suitable for testing.
27  */
28 public class TestClientStreamTracer extends ClientStreamTracer implements TestStreamTracer {
29   private final TestBaseStreamTracer delegate = new TestBaseStreamTracer();
30   protected final CountDownLatch outboundHeadersLatch = new CountDownLatch(1);
31   protected final AtomicReference<Throwable> outboundHeadersCalled =
32       new AtomicReference<Throwable>();
33   protected final AtomicReference<Throwable> inboundHeadersCalled =
34       new AtomicReference<Throwable>();
35 
36   @Override
await()37   public void await() throws InterruptedException {
38     delegate.await();
39   }
40 
41   @Override
await(long timeout, TimeUnit timeUnit)42   public boolean await(long timeout, TimeUnit timeUnit) throws InterruptedException {
43     return delegate.await(timeout, timeUnit);
44   }
45 
46   /**
47    * Returns if {@link ClientStreamTracer#inboundHeaders} has been called.
48    */
getInboundHeaders()49   public boolean getInboundHeaders() {
50     return inboundHeadersCalled.get() != null;
51   }
52 
53   /**
54    * Returns if {@link ClientStreamTracer#outboundHeaders} has been called.
55    */
getOutboundHeaders()56   public boolean getOutboundHeaders() {
57     return outboundHeadersCalled.get() != null;
58   }
59 
60   /**
61    * Allow tests to await the outbound header event, which depending on the test case may be
62    * necessary (e.g., if we test for a Netty client's outbound headers upon receiving the start of
63    * stream on the server side, the tracer won't know that headers were sent until a channel future
64    * executes).
65    */
awaitOutboundHeaders(int timeout, TimeUnit unit)66   public boolean awaitOutboundHeaders(int timeout, TimeUnit unit) throws Exception {
67     return outboundHeadersLatch.await(timeout, unit);
68   }
69 
70   @Override
getStatus()71   public Status getStatus() {
72     return delegate.getStatus();
73   }
74 
75   @Override
getInboundWireSize()76   public long getInboundWireSize() {
77     return delegate.getInboundWireSize();
78   }
79 
80   @Override
getInboundUncompressedSize()81   public long getInboundUncompressedSize() {
82     return delegate.getInboundUncompressedSize();
83   }
84 
85   @Override
getOutboundWireSize()86   public long getOutboundWireSize() {
87     return delegate.getOutboundWireSize();
88   }
89 
90   @Override
getOutboundUncompressedSize()91   public long getOutboundUncompressedSize() {
92     return delegate.getOutboundUncompressedSize();
93   }
94 
95   @Override
setFailDuplicateCallbacks(boolean fail)96   public void setFailDuplicateCallbacks(boolean fail) {
97     delegate.setFailDuplicateCallbacks(fail);
98   }
99 
100   @Override
nextOutboundEvent()101   public String nextOutboundEvent() {
102     return delegate.nextOutboundEvent();
103   }
104 
105   @Override
nextInboundEvent()106   public String nextInboundEvent() {
107     return delegate.nextInboundEvent();
108   }
109 
110   @Override
outboundWireSize(long bytes)111   public void outboundWireSize(long bytes) {
112     delegate.outboundWireSize(bytes);
113   }
114 
115   @Override
inboundWireSize(long bytes)116   public void inboundWireSize(long bytes) {
117     delegate.inboundWireSize(bytes);
118   }
119 
120   @Override
outboundUncompressedSize(long bytes)121   public void outboundUncompressedSize(long bytes) {
122     delegate.outboundUncompressedSize(bytes);
123   }
124 
125   @Override
inboundUncompressedSize(long bytes)126   public void inboundUncompressedSize(long bytes) {
127     delegate.inboundUncompressedSize(bytes);
128   }
129 
130   @Override
streamClosed(Status status)131   public void streamClosed(Status status) {
132     delegate.streamClosed(status);
133   }
134 
135   @Override
inboundMessage(int seqNo)136   public void inboundMessage(int seqNo) {
137     delegate.inboundMessage(seqNo);
138   }
139 
140   @Override
outboundMessage(int seqNo)141   public void outboundMessage(int seqNo) {
142     delegate.outboundMessage(seqNo);
143   }
144 
145   @Override
outboundMessageSent(int seqNo, long optionalWireSize, long optionalUncompressedSize)146   public void outboundMessageSent(int seqNo, long optionalWireSize, long optionalUncompressedSize) {
147     delegate.outboundMessageSent(seqNo, optionalWireSize, optionalUncompressedSize);
148   }
149 
150   @Override
inboundMessageRead(int seqNo, long optionalWireSize, long optionalUncompressedSize)151   public void inboundMessageRead(int seqNo, long optionalWireSize, long optionalUncompressedSize) {
152     delegate.inboundMessageRead(seqNo, optionalWireSize, optionalUncompressedSize);
153   }
154 
155   @Override
outboundHeaders()156   public void outboundHeaders() {
157     if (!outboundHeadersCalled.compareAndSet(null, new Exception("first stack"))
158         && delegate.failDuplicateCallbacks.get()) {
159       throw new AssertionError(
160           "outboundHeaders called more than once",
161           new Exception("second stack", outboundHeadersCalled.get()));
162     }
163     outboundHeadersLatch.countDown();
164   }
165 
166   @Override
inboundHeaders()167   public void inboundHeaders() {
168     if (!inboundHeadersCalled.compareAndSet(null, new Exception("first stack"))
169         && delegate.failDuplicateCallbacks.get()) {
170       throw new AssertionError(
171           "inboundHeaders called more than once",
172           new Exception("second stack", inboundHeadersCalled.get()));
173     }
174   }
175 }
176