1 /*
2  * Copyright 2017 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.internal.testing;
18 
19 import io.grpc.Status;
20 import io.grpc.StreamTracer;
21 import java.util.concurrent.CountDownLatch;
22 import java.util.concurrent.LinkedBlockingQueue;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.atomic.AtomicBoolean;
25 import java.util.concurrent.atomic.AtomicLong;
26 import java.util.concurrent.atomic.AtomicReference;
27 import javax.annotation.Nullable;
28 
29 /**
30  * A {@link StreamTracer} suitable for testing.
31  */
32 public interface TestStreamTracer {
33 
34   /**
35    * Waits for the stream to be done.
36    */
await()37   void await() throws InterruptedException;
38 
39   /**
40    * Waits for the stream to be done.
41    */
await(long timeout, TimeUnit timeUnit)42   boolean await(long timeout, TimeUnit timeUnit) throws InterruptedException;
43 
44   /**
45    * Returns the status passed to {@link StreamTracer#streamClosed}.
46    */
getStatus()47   Status getStatus();
48 
49   /**
50    * Returns to sum of all sizes passed to {@link StreamTracer#inboundWireSize}.
51    */
getInboundWireSize()52   long getInboundWireSize();
53 
54   /**
55    * Returns to sum of all sizes passed to {@link StreamTracer#inboundUncompressedSize}.
56    */
getInboundUncompressedSize()57   long getInboundUncompressedSize();
58 
59   /**
60    * Returns to sum of all sizes passed to {@link StreamTracer#outboundWireSize}.
61    */
getOutboundWireSize()62   long getOutboundWireSize();
63 
64   /**
65    * Returns to sum of al sizes passed to {@link StreamTracer#outboundUncompressedSize}.
66    */
getOutboundUncompressedSize()67   long getOutboundUncompressedSize();
68 
69   /**
70    * Sets whether to fail on unexpected duplicate calls to callback methods.
71    */
setFailDuplicateCallbacks(boolean fail)72   void setFailDuplicateCallbacks(boolean fail);
73 
74   /**
75    * Returns the next captured outbound message event.
76    */
77   @Nullable
nextOutboundEvent()78   String nextOutboundEvent();
79 
80   /**
81    * Returns the next captured outbound message event.
82    */
nextInboundEvent()83   String nextInboundEvent();
84 
85   /**
86    * A {@link StreamTracer} suitable for testing.
87    */
88   public static class TestBaseStreamTracer extends StreamTracer implements TestStreamTracer {
89 
90     protected final AtomicLong outboundWireSize = new AtomicLong();
91     protected final AtomicLong inboundWireSize = new AtomicLong();
92     protected final AtomicLong outboundUncompressedSize = new AtomicLong();
93     protected final AtomicLong inboundUncompressedSize = new AtomicLong();
94     protected final LinkedBlockingQueue<String> outboundEvents = new LinkedBlockingQueue<String>();
95     protected final LinkedBlockingQueue<String> inboundEvents = new LinkedBlockingQueue<String>();
96     protected final AtomicReference<Status> streamClosedStatus = new AtomicReference<Status>();
97     protected final CountDownLatch streamClosed = new CountDownLatch(1);
98     protected final AtomicBoolean failDuplicateCallbacks = new AtomicBoolean(true);
99 
100     @Override
await()101     public void await() throws InterruptedException {
102       streamClosed.await();
103     }
104 
105     @Override
await(long timeout, TimeUnit timeUnit)106     public boolean await(long timeout, TimeUnit timeUnit) throws InterruptedException {
107       return streamClosed.await(timeout, timeUnit);
108     }
109 
110     @Override
getStatus()111     public Status getStatus() {
112       return streamClosedStatus.get();
113     }
114 
115     @Override
getInboundWireSize()116     public long getInboundWireSize() {
117       return inboundWireSize.get();
118     }
119 
120     @Override
getInboundUncompressedSize()121     public long getInboundUncompressedSize() {
122       return inboundUncompressedSize.get();
123     }
124 
125     @Override
getOutboundWireSize()126     public long getOutboundWireSize() {
127       return outboundWireSize.get();
128     }
129 
130     @Override
getOutboundUncompressedSize()131     public long getOutboundUncompressedSize() {
132       return outboundUncompressedSize.get();
133     }
134 
135     @Override
outboundWireSize(long bytes)136     public void outboundWireSize(long bytes) {
137       outboundWireSize.addAndGet(bytes);
138     }
139 
140     @Override
inboundWireSize(long bytes)141     public void inboundWireSize(long bytes) {
142       inboundWireSize.addAndGet(bytes);
143     }
144 
145     @Override
outboundUncompressedSize(long bytes)146     public void outboundUncompressedSize(long bytes) {
147       outboundUncompressedSize.addAndGet(bytes);
148     }
149 
150     @Override
inboundUncompressedSize(long bytes)151     public void inboundUncompressedSize(long bytes) {
152       inboundUncompressedSize.addAndGet(bytes);
153     }
154 
155     @Override
streamClosed(Status status)156     public void streamClosed(Status status) {
157       if (!streamClosedStatus.compareAndSet(null, status)) {
158         if (failDuplicateCallbacks.get()) {
159           throw new AssertionError("streamClosed called more than once");
160         }
161       } else {
162         streamClosed.countDown();
163       }
164     }
165 
166     @Override
inboundMessage(int seqNo)167     public void inboundMessage(int seqNo) {
168       inboundEvents.add("inboundMessage(" + seqNo + ")");
169     }
170 
171     @Override
outboundMessage(int seqNo)172     public void outboundMessage(int seqNo) {
173       outboundEvents.add("outboundMessage(" + seqNo + ")");
174     }
175 
176     @Override
outboundMessageSent( int seqNo, long optionalWireSize, long optionalUncompressedSize)177     public void outboundMessageSent(
178         int seqNo, long optionalWireSize, long optionalUncompressedSize) {
179       outboundEvents.add(
180           String.format(
181               "outboundMessageSent(%d, %d, %d)",
182               seqNo, optionalWireSize, optionalUncompressedSize));
183     }
184 
185     @Override
inboundMessageRead( int seqNo, long optionalWireSize, long optionalUncompressedSize)186     public void inboundMessageRead(
187         int seqNo, long optionalWireSize, long optionalUncompressedSize) {
188       inboundEvents.add(
189           String.format(
190               "inboundMessageRead(%d, %d, %d)", seqNo, optionalWireSize, optionalUncompressedSize));
191     }
192 
193     @Override
setFailDuplicateCallbacks(boolean fail)194     public void setFailDuplicateCallbacks(boolean fail) {
195       failDuplicateCallbacks.set(fail);
196     }
197 
198     @Override
nextOutboundEvent()199     public String nextOutboundEvent() {
200       return outboundEvents.poll();
201     }
202 
203     @Override
nextInboundEvent()204     public String nextInboundEvent() {
205       return inboundEvents.poll();
206     }
207   }
208 }
209