1 /*
2  * Copyright 2016 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.internal.testing;
18 
19 import static com.google.common.base.Charsets.UTF_8;
20 import static com.google.common.truth.Truth.assertThat;
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertFalse;
23 import static org.junit.Assert.assertNotEquals;
24 import static org.junit.Assert.assertNotNull;
25 import static org.junit.Assert.assertNull;
26 import static org.junit.Assert.assertSame;
27 import static org.junit.Assert.assertTrue;
28 import static org.junit.Assert.fail;
29 import static org.junit.Assume.assumeTrue;
30 import static org.mockito.Matchers.any;
31 import static org.mockito.Matchers.anyBoolean;
32 import static org.mockito.Matchers.anyString;
33 import static org.mockito.Matchers.eq;
34 import static org.mockito.Matchers.same;
35 import static org.mockito.Mockito.inOrder;
36 import static org.mockito.Mockito.mock;
37 import static org.mockito.Mockito.never;
38 import static org.mockito.Mockito.timeout;
39 import static org.mockito.Mockito.verify;
40 import static org.mockito.Mockito.when;
41 
42 import com.google.common.base.Objects;
43 import com.google.common.collect.Lists;
44 import com.google.common.util.concurrent.MoreExecutors;
45 import com.google.common.util.concurrent.SettableFuture;
46 import io.grpc.Attributes;
47 import io.grpc.CallOptions;
48 import io.grpc.ClientStreamTracer;
49 import io.grpc.Grpc;
50 import io.grpc.InternalChannelz.SocketStats;
51 import io.grpc.InternalChannelz.TransportStats;
52 import io.grpc.InternalInstrumented;
53 import io.grpc.Metadata;
54 import io.grpc.MethodDescriptor;
55 import io.grpc.ServerStreamTracer;
56 import io.grpc.Status;
57 import io.grpc.internal.ClientStream;
58 import io.grpc.internal.ClientStreamListener;
59 import io.grpc.internal.ClientTransport;
60 import io.grpc.internal.ConnectionClientTransport;
61 import io.grpc.internal.GrpcAttributes;
62 import io.grpc.internal.InternalServer;
63 import io.grpc.internal.IoUtils;
64 import io.grpc.internal.ManagedClientTransport;
65 import io.grpc.internal.ServerListener;
66 import io.grpc.internal.ServerStream;
67 import io.grpc.internal.ServerStreamListener;
68 import io.grpc.internal.ServerTransport;
69 import io.grpc.internal.ServerTransportListener;
70 import io.grpc.internal.TimeProvider;
71 import io.grpc.internal.TransportTracer;
72 import java.io.ByteArrayInputStream;
73 import java.io.IOException;
74 import java.io.InputStream;
75 import java.net.SocketAddress;
76 import java.util.Arrays;
77 import java.util.List;
78 import java.util.concurrent.BlockingQueue;
79 import java.util.concurrent.CountDownLatch;
80 import java.util.concurrent.ExecutionException;
81 import java.util.concurrent.Future;
82 import java.util.concurrent.LinkedBlockingQueue;
83 import java.util.concurrent.TimeUnit;
84 import java.util.concurrent.TimeoutException;
85 import org.junit.After;
86 import org.junit.Before;
87 import org.junit.Rule;
88 import org.junit.Test;
89 import org.junit.rules.ExpectedException;
90 import org.junit.runner.RunWith;
91 import org.junit.runners.JUnit4;
92 import org.mockito.ArgumentCaptor;
93 import org.mockito.InOrder;
94 import org.mockito.Matchers;
95 
96 /** Standard unit tests for {@link ClientTransport}s and {@link ServerTransport}s. */
97 @RunWith(JUnit4.class)
98 public abstract class AbstractTransportTest {
99   private static final int TIMEOUT_MS = 1000;
100 
101   private static final Attributes.Key<String> ADDITIONAL_TRANSPORT_ATTR_KEY =
102       Attributes.Key.create("additional-attr");
103 
104   protected final TransportTracer.Factory fakeClockTransportTracer = new TransportTracer.Factory(
105       new TimeProvider() {
106         @Override
107         public long currentTimeNanos() {
108           return fakeCurrentTimeNanos();
109         }
110       });
111 
112   /**
113    * Returns a new server that when started will be able to be connected to from the client. Each
114    * returned instance should be new and yet be accessible by new client transports.
115    */
newServer( List<ServerStreamTracer.Factory> streamTracerFactories)116   protected abstract InternalServer newServer(
117       List<ServerStreamTracer.Factory> streamTracerFactories);
118 
119   /**
120    * Builds a new server that is listening on the same location as the given server instance does.
121    */
newServer( InternalServer server, List<ServerStreamTracer.Factory> streamTracerFactories)122   protected abstract InternalServer newServer(
123       InternalServer server, List<ServerStreamTracer.Factory> streamTracerFactories);
124 
125   /**
126    * Returns a new transport that when started will be able to connect to {@code server}.
127    */
newClientTransport(InternalServer server)128   protected abstract ManagedClientTransport newClientTransport(InternalServer server);
129 
130   /**
131    * Returns the authority string used by a client to connect to {@code server}.
132    */
testAuthority(InternalServer server)133   protected abstract String testAuthority(InternalServer server);
134 
135   /**
136    * Returns true (which is default) if the transport reports message sizes to StreamTracers.
137    */
sizesReported()138   protected boolean sizesReported() {
139     return true;
140   }
141 
142   /**
143    * When non-null, will be shut down during tearDown(). However, it _must_ have been started with
144    * {@code serverListener}, otherwise tearDown() can't wait for shutdown which can put following
145    * tests in an indeterminate state.
146    */
147   private InternalServer server;
148   private ServerTransport serverTransport;
149   private ManagedClientTransport client;
150   private MethodDescriptor<String, String> methodDescriptor =
151       MethodDescriptor.<String, String>newBuilder()
152           .setType(MethodDescriptor.MethodType.UNKNOWN)
153           .setFullMethodName("service/method")
154           .setRequestMarshaller(StringMarshaller.INSTANCE)
155           .setResponseMarshaller(StringMarshaller.INSTANCE)
156           .build();
157   private CallOptions callOptions;
158 
159   private Metadata.Key<String> asciiKey = Metadata.Key.of(
160       "ascii-key", Metadata.ASCII_STRING_MARSHALLER);
161   private Metadata.Key<String> binaryKey = Metadata.Key.of(
162       "key-bin", StringBinaryMarshaller.INSTANCE);
163 
164   private ManagedClientTransport.Listener mockClientTransportListener
165       = mock(ManagedClientTransport.Listener.class);
166   private MockServerListener serverListener = new MockServerListener();
167   private ArgumentCaptor<Throwable> throwableCaptor = ArgumentCaptor.forClass(Throwable.class);
168   private final ClientStreamTracer.Factory clientStreamTracerFactory =
169       mock(ClientStreamTracer.Factory.class);
170 
171   private final TestClientStreamTracer clientStreamTracer1 = new TestClientStreamTracer();
172   private final TestClientStreamTracer clientStreamTracer2 = new TestClientStreamTracer();
173   private final ServerStreamTracer.Factory serverStreamTracerFactory =
174       mock(ServerStreamTracer.Factory.class);
175   private final TestServerStreamTracer serverStreamTracer1 = new TestServerStreamTracer();
176   private final TestServerStreamTracer serverStreamTracer2 = new TestServerStreamTracer();
177 
178   @Rule
179   public ExpectedException thrown = ExpectedException.none();
180 
181   @Before
setUp()182   public void setUp() {
183     server = newServer(Arrays.asList(serverStreamTracerFactory));
184     when(clientStreamTracerFactory
185         .newClientStreamTracer(any(CallOptions.class), any(Metadata.class)))
186         .thenReturn(clientStreamTracer1)
187         .thenReturn(clientStreamTracer2);
188     when(serverStreamTracerFactory.newServerStreamTracer(anyString(), any(Metadata.class)))
189         .thenReturn(serverStreamTracer1)
190         .thenReturn(serverStreamTracer2);
191     callOptions = CallOptions.DEFAULT.withStreamTracerFactory(clientStreamTracerFactory);
192   }
193 
194   @After
tearDown()195   public void tearDown() throws InterruptedException {
196     if (client != null) {
197       client.shutdownNow(Status.UNKNOWN.withDescription("teardown"));
198     }
199     if (serverTransport != null) {
200       serverTransport.shutdownNow(Status.UNKNOWN.withDescription("teardown"));
201     }
202     if (server != null) {
203       server.shutdown();
204       assertTrue(serverListener.waitForShutdown(TIMEOUT_MS, TimeUnit.MILLISECONDS));
205     }
206   }
207 
208   /**
209    * Moves the clock forward, for tests that require moving the clock forward. It is the transport
210    * subclass's responsibility to implement this method.
211    */
advanceClock(long offset, TimeUnit unit)212   protected void advanceClock(long offset, TimeUnit unit) {
213     throw new UnsupportedOperationException();
214   }
215 
216   /**
217    * Returns the current time, for tests that rely on the clock.
218    */
fakeCurrentTimeNanos()219   protected long fakeCurrentTimeNanos() {
220     throw new UnsupportedOperationException();
221   }
222 
223   // TODO(ejona):
224   //   multiple streams on same transport
225   //   multiple client transports to same server
226   //   halfClose to trigger flush (client and server)
227   //   flow control pushes back (client and server)
228   //   flow control provides precisely number of messages requested (client and server)
229   //   onReady called when buffer drained (on server and client)
230   //   test no start reentrancy (esp. during failure) (transport and call)
231   //   multiple requests/responses (verifying contents received)
232   //   server transport shutdown triggers client shutdown (via GOAWAY)
233   //   queued message InputStreams are closed on stream cancel
234   //     (and maybe exceptions handled)
235 
236   /**
237    * Test for issue https://github.com/grpc/grpc-java/issues/1682
238    */
239   @Test
frameAfterRstStreamShouldNotBreakClientChannel()240   public void frameAfterRstStreamShouldNotBreakClientChannel() throws Exception {
241     server.start(serverListener);
242     client = newClientTransport(server);
243     startTransport(client, mockClientTransportListener);
244     MockServerTransportListener serverTransportListener
245         = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
246     serverTransport = serverTransportListener.transport;
247 
248     // Try to create a sequence of frames so that the client receives a HEADERS or DATA frame
249     // after having sent a RST_STREAM to the server. Previously, this would have broken the
250     // Netty channel.
251 
252     ClientStream stream = client.newStream(methodDescriptor, new Metadata(), callOptions);
253     ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
254     stream.start(clientStreamListener);
255     StreamCreation serverStreamCreation
256         = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
257     stream.flush();
258     stream.writeMessage(methodDescriptor.streamRequest("foo"));
259     stream.flush();
260     stream.cancel(Status.CANCELLED);
261     stream.flush();
262     serverStreamCreation.stream.writeHeaders(new Metadata());
263     serverStreamCreation.stream.flush();
264     serverStreamCreation.stream.writeMessage(methodDescriptor.streamResponse("bar"));
265     serverStreamCreation.stream.flush();
266 
267     assertEquals(
268         Status.CANCELLED, clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
269     assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
270 
271     ClientStreamListener mockClientStreamListener2 = mock(ClientStreamListener.class);
272 
273     // Test that the channel is still usable i.e. we can receive headers from the server on a
274     // new stream.
275     stream = client.newStream(methodDescriptor, new Metadata(), callOptions);
276     stream.start(mockClientStreamListener2);
277     serverStreamCreation
278         = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
279     serverStreamCreation.stream.writeHeaders(new Metadata());
280     serverStreamCreation.stream.flush();
281 
282     verify(mockClientStreamListener2, timeout(250)).headersRead(any(Metadata.class));
283   }
284 
285   @Test
serverNotListening()286   public void serverNotListening() throws Exception {
287     // Start server to just acquire a port.
288     server.start(serverListener);
289     client = newClientTransport(server);
290     server.shutdown();
291     assertTrue(serverListener.waitForShutdown(TIMEOUT_MS, TimeUnit.MILLISECONDS));
292     server = null;
293 
294     InOrder inOrder = inOrder(mockClientTransportListener);
295     runIfNotNull(client.start(mockClientTransportListener));
296     verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportTerminated();
297     ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
298     inOrder.verify(mockClientTransportListener).transportShutdown(statusCaptor.capture());
299     assertCodeEquals(Status.UNAVAILABLE, statusCaptor.getValue());
300     inOrder.verify(mockClientTransportListener).transportTerminated();
301     verify(mockClientTransportListener, never()).transportReady();
302     verify(mockClientTransportListener, never()).transportInUse(anyBoolean());
303   }
304 
305   @Test
clientStartStop()306   public void clientStartStop() throws Exception {
307     server.start(serverListener);
308     client = newClientTransport(server);
309     InOrder inOrder = inOrder(mockClientTransportListener);
310     startTransport(client, mockClientTransportListener);
311     Status shutdownReason = Status.UNAVAILABLE.withDescription("shutdown called");
312     client.shutdown(shutdownReason);
313     verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportTerminated();
314     inOrder.verify(mockClientTransportListener).transportShutdown(same(shutdownReason));
315     inOrder.verify(mockClientTransportListener).transportTerminated();
316     verify(mockClientTransportListener, never()).transportInUse(anyBoolean());
317   }
318 
319   @Test
clientStartAndStopOnceConnected()320   public void clientStartAndStopOnceConnected() throws Exception {
321     server.start(serverListener);
322     client = newClientTransport(server);
323     InOrder inOrder = inOrder(mockClientTransportListener);
324     startTransport(client, mockClientTransportListener);
325     verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportReady();
326     MockServerTransportListener serverTransportListener
327         = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
328     client.shutdown(Status.UNAVAILABLE);
329     verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportTerminated();
330     inOrder.verify(mockClientTransportListener).transportShutdown(any(Status.class));
331     inOrder.verify(mockClientTransportListener).transportTerminated();
332     assertTrue(serverTransportListener.waitForTermination(TIMEOUT_MS, TimeUnit.MILLISECONDS));
333     server.shutdown();
334     assertTrue(serverListener.waitForShutdown(TIMEOUT_MS, TimeUnit.MILLISECONDS));
335     server = null;
336     verify(mockClientTransportListener, never()).transportInUse(anyBoolean());
337   }
338 
339   @Test
checkClientAttributes()340   public void checkClientAttributes() throws Exception {
341     server.start(serverListener);
342     client = newClientTransport(server);
343     assumeTrue(client instanceof ConnectionClientTransport);
344     ConnectionClientTransport connectionClient = (ConnectionClientTransport) client;
345     startTransport(connectionClient, mockClientTransportListener);
346     verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportReady();
347 
348     assertNotNull("security level should be set in client attributes",
349         connectionClient.getAttributes().get(GrpcAttributes.ATTR_SECURITY_LEVEL));
350   }
351 
352   @Test
serverAlreadyListening()353   public void serverAlreadyListening() throws Exception {
354     client = null;
355     server.start(serverListener);
356     InternalServer server2 = newServer(server, Arrays.asList(serverStreamTracerFactory));
357     thrown.expect(IOException.class);
358     server2.start(new MockServerListener());
359   }
360 
361   @Test
openStreamPreventsTermination()362   public void openStreamPreventsTermination() throws Exception {
363     server.start(serverListener);
364     client = newClientTransport(server);
365     startTransport(client, mockClientTransportListener);
366     MockServerTransportListener serverTransportListener
367         = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
368     serverTransport = serverTransportListener.transport;
369 
370     ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
371     ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
372     clientStream.start(clientStreamListener);
373     verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportInUse(true);
374     StreamCreation serverStreamCreation
375         = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
376     ServerStream serverStream = serverStreamCreation.stream;
377     ServerStreamListenerBase serverStreamListener = serverStreamCreation.listener;
378 
379     client.shutdown(Status.UNAVAILABLE);
380     client = null;
381     server.shutdown();
382     serverTransport.shutdown();
383     serverTransport = null;
384 
385     verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportShutdown(any(Status.class));
386     assertTrue(serverListener.waitForShutdown(TIMEOUT_MS, TimeUnit.MILLISECONDS));
387 
388     // A new server should be able to start listening, since the current server has given up
389     // resources. There may be cases this is impossible in the future, but for now it is a useful
390     // property.
391     serverListener = new MockServerListener();
392     server = newServer(server, Arrays.asList(serverStreamTracerFactory));
393     server.start(serverListener);
394 
395     // Try to "flush" out any listener notifications on client and server. This also ensures that
396     // the stream still functions.
397     serverStream.writeHeaders(new Metadata());
398     clientStream.halfClose();
399     assertNotNull(clientStreamListener.headers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
400     assertTrue(serverStreamListener.awaitHalfClosed(TIMEOUT_MS, TimeUnit.MILLISECONDS));
401 
402     verify(mockClientTransportListener, never()).transportTerminated();
403     verify(mockClientTransportListener, never()).transportInUse(false);
404     assertFalse(serverTransportListener.isTerminated());
405 
406     clientStream.cancel(Status.CANCELLED);
407 
408     verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportTerminated();
409     verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportInUse(false);
410     assertTrue(serverTransportListener.waitForTermination(TIMEOUT_MS, TimeUnit.MILLISECONDS));
411   }
412 
413   @Test
shutdownNowKillsClientStream()414   public void shutdownNowKillsClientStream() throws Exception {
415     server.start(serverListener);
416     client = newClientTransport(server);
417     startTransport(client, mockClientTransportListener);
418     MockServerTransportListener serverTransportListener
419         = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
420     serverTransport = serverTransportListener.transport;
421 
422     ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
423     ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
424     clientStream.start(clientStreamListener);
425     verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportInUse(true);
426     StreamCreation serverStreamCreation
427         = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
428     ServerStreamListenerBase serverStreamListener = serverStreamCreation.listener;
429 
430     Status status = Status.UNKNOWN.withDescription("test shutdownNow");
431     client.shutdownNow(status);
432     client = null;
433 
434     verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportShutdown(any(Status.class));
435     verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportTerminated();
436     verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportInUse(false);
437     assertTrue(serverTransportListener.waitForTermination(TIMEOUT_MS, TimeUnit.MILLISECONDS));
438     assertTrue(serverTransportListener.isTerminated());
439 
440     assertEquals(status, clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
441     assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
442     Status serverStatus = serverStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
443     assertFalse(serverStatus.isOk());
444     assertTrue(clientStreamTracer1.await(TIMEOUT_MS, TimeUnit.MILLISECONDS));
445     assertStatusEquals(status, clientStreamTracer1.getStatus());
446     assertTrue(serverStreamTracer1.await(TIMEOUT_MS, TimeUnit.MILLISECONDS));
447     assertStatusEquals(serverStatus, serverStreamTracer1.getStatus());
448   }
449 
450   @Test
shutdownNowKillsServerStream()451   public void shutdownNowKillsServerStream() throws Exception {
452     server.start(serverListener);
453     client = newClientTransport(server);
454     startTransport(client, mockClientTransportListener);
455     MockServerTransportListener serverTransportListener
456         = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
457     serverTransport = serverTransportListener.transport;
458 
459     ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
460     ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
461     clientStream.start(clientStreamListener);
462     verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportInUse(true);
463     StreamCreation serverStreamCreation
464         = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
465     ServerStreamListenerBase serverStreamListener = serverStreamCreation.listener;
466 
467     Status shutdownStatus = Status.UNKNOWN.withDescription("test shutdownNow");
468     serverTransport.shutdownNow(shutdownStatus);
469     serverTransport = null;
470 
471     verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportShutdown(any(Status.class));
472     verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportTerminated();
473     verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportInUse(false);
474     assertTrue(serverTransportListener.waitForTermination(TIMEOUT_MS, TimeUnit.MILLISECONDS));
475     assertTrue(serverTransportListener.isTerminated());
476 
477     Status clientStreamStatus = clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
478     assertFalse(clientStreamStatus.isOk());
479     assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
480     assertTrue(clientStreamTracer1.await(TIMEOUT_MS, TimeUnit.MILLISECONDS));
481     assertStatusEquals(clientStreamStatus, clientStreamTracer1.getStatus());
482     assertTrue(serverStreamTracer1.await(TIMEOUT_MS, TimeUnit.MILLISECONDS));
483     assertStatusEquals(shutdownStatus, serverStreamTracer1.getStatus());
484 
485     // Generally will be same status provided to shutdownNow, but InProcessTransport can't
486     // differentiate between client and server shutdownNow. The status is not really used on
487     // server-side, so we don't care much.
488     assertNotNull(serverStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
489   }
490 
491   @Test
ping()492   public void ping() throws Exception {
493     server.start(serverListener);
494     client = newClientTransport(server);
495     startTransport(client, mockClientTransportListener);
496     ClientTransport.PingCallback mockPingCallback = mock(ClientTransport.PingCallback.class);
497     try {
498       client.ping(mockPingCallback, MoreExecutors.directExecutor());
499     } catch (UnsupportedOperationException ex) {
500       // Transport doesn't support ping, so this neither passes nor fails.
501       assumeTrue(false);
502     }
503     verify(mockPingCallback, timeout(TIMEOUT_MS)).onSuccess(Matchers.anyLong());
504     verify(mockClientTransportListener, never()).transportInUse(anyBoolean());
505   }
506 
507   @Test
ping_duringShutdown()508   public void ping_duringShutdown() throws Exception {
509     server.start(serverListener);
510     client = newClientTransport(server);
511     startTransport(client, mockClientTransportListener);
512     // Stream prevents termination
513     ClientStream stream = client.newStream(methodDescriptor, new Metadata(), callOptions);
514     ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
515     stream.start(clientStreamListener);
516     client.shutdown(Status.UNAVAILABLE);
517     verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportShutdown(any(Status.class));
518     ClientTransport.PingCallback mockPingCallback = mock(ClientTransport.PingCallback.class);
519     try {
520       client.ping(mockPingCallback, MoreExecutors.directExecutor());
521     } catch (UnsupportedOperationException ex) {
522       // Transport doesn't support ping, so this neither passes nor fails.
523       assumeTrue(false);
524     }
525     verify(mockPingCallback, timeout(TIMEOUT_MS)).onSuccess(Matchers.anyLong());
526     stream.cancel(Status.CANCELLED);
527   }
528 
529   @Test
ping_afterTermination()530   public void ping_afterTermination() throws Exception {
531     server.start(serverListener);
532     client = newClientTransport(server);
533     startTransport(client, mockClientTransportListener);
534     verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportReady();
535     Status shutdownReason = Status.UNAVAILABLE.withDescription("shutdown called");
536     client.shutdown(shutdownReason);
537     verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportTerminated();
538     ClientTransport.PingCallback mockPingCallback = mock(ClientTransport.PingCallback.class);
539     try {
540       client.ping(mockPingCallback, MoreExecutors.directExecutor());
541     } catch (UnsupportedOperationException ex) {
542       // Transport doesn't support ping, so this neither passes nor fails.
543       assumeTrue(false);
544     }
545     verify(mockPingCallback, timeout(TIMEOUT_MS)).onFailure(throwableCaptor.capture());
546     Status status = Status.fromThrowable(throwableCaptor.getValue());
547     assertSame(shutdownReason, status);
548   }
549 
550   @Test
newStream_duringShutdown()551   public void newStream_duringShutdown() throws Exception {
552     InOrder inOrder = inOrder(clientStreamTracerFactory);
553     server.start(serverListener);
554     client = newClientTransport(server);
555     startTransport(client, mockClientTransportListener);
556     // Stream prevents termination
557     ClientStream stream = client.newStream(methodDescriptor, new Metadata(), callOptions);
558     inOrder.verify(clientStreamTracerFactory).newClientStreamTracer(
559         any(CallOptions.class), any(Metadata.class));
560     ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
561     stream.start(clientStreamListener);
562     client.shutdown(Status.UNAVAILABLE);
563     verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportShutdown(any(Status.class));
564 
565     ClientStream stream2 = client.newStream(methodDescriptor, new Metadata(), callOptions);
566     inOrder.verify(clientStreamTracerFactory).newClientStreamTracer(
567         any(CallOptions.class), any(Metadata.class));
568     ClientStreamListenerBase clientStreamListener2 = new ClientStreamListenerBase();
569     stream2.start(clientStreamListener2);
570     Status clientStreamStatus2 =
571         clientStreamListener2.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
572     assertNotNull(clientStreamListener2.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
573     assertCodeEquals(Status.UNAVAILABLE, clientStreamStatus2);
574     assertSame(clientStreamStatus2, clientStreamTracer2.getStatus());
575 
576     // Make sure earlier stream works.
577     MockServerTransportListener serverTransportListener
578         = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
579     serverTransport = serverTransportListener.transport;
580     // TODO(zdapeng): Increased timeout to 20 seconds to see if flakiness of #2328 persists. Take
581     // further action after sufficient observation.
582     StreamCreation serverStreamCreation
583         = serverTransportListener.takeStreamOrFail(20 * TIMEOUT_MS, TimeUnit.MILLISECONDS);
584     serverStreamCreation.stream.close(Status.OK, new Metadata());
585     assertCodeEquals(Status.OK, clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
586     assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
587   }
588 
589   @Test
newStream_afterTermination()590   public void newStream_afterTermination() throws Exception {
591     // We expect the same general behavior as duringShutdown, but for some transports (e.g., Netty)
592     // dealing with afterTermination is harder than duringShutdown.
593     server.start(serverListener);
594     client = newClientTransport(server);
595     startTransport(client, mockClientTransportListener);
596     verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportReady();
597     Status shutdownReason = Status.UNAVAILABLE.withDescription("shutdown called");
598     client.shutdown(shutdownReason);
599     verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportTerminated();
600     Thread.sleep(100);
601     ClientStream stream = client.newStream(methodDescriptor, new Metadata(), callOptions);
602     ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
603     stream.start(clientStreamListener);
604     assertEquals(
605         shutdownReason, clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
606     assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
607     verify(mockClientTransportListener, never()).transportInUse(anyBoolean());
608     verify(clientStreamTracerFactory).newClientStreamTracer(
609         any(CallOptions.class), any(Metadata.class));
610     assertSame(shutdownReason, clientStreamTracer1.getStatus());
611     // Assert no interactions
612     assertNull(serverStreamTracer1.getServerCallInfo());
613   }
614 
615   @Test
transportInUse_normalClose()616   public void transportInUse_normalClose() throws Exception {
617     server.start(serverListener);
618     client = newClientTransport(server);
619     startTransport(client, mockClientTransportListener);
620     ClientStream stream1 = client.newStream(methodDescriptor, new Metadata(), callOptions);
621     ClientStreamListenerBase clientStreamListener1 = new ClientStreamListenerBase();
622     stream1.start(clientStreamListener1);
623     verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportInUse(true);
624     MockServerTransportListener serverTransportListener
625         = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
626     StreamCreation serverStreamCreation1
627         = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
628     ClientStream stream2 = client.newStream(methodDescriptor, new Metadata(), callOptions);
629     ClientStreamListenerBase clientStreamListener2 = new ClientStreamListenerBase();
630     stream2.start(clientStreamListener2);
631     StreamCreation serverStreamCreation2
632         = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
633 
634     stream1.halfClose();
635     serverStreamCreation1.stream.close(Status.OK, new Metadata());
636     stream2.halfClose();
637     verify(mockClientTransportListener, never()).transportInUse(false);
638     serverStreamCreation2.stream.close(Status.OK, new Metadata());
639     verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportInUse(false);
640     // Verify that the callback has been called only once for true and false respectively
641     verify(mockClientTransportListener).transportInUse(true);
642     verify(mockClientTransportListener).transportInUse(false);
643   }
644 
645   @Test
transportInUse_clientCancel()646   public void transportInUse_clientCancel() throws Exception {
647     server.start(serverListener);
648     client = newClientTransport(server);
649     startTransport(client, mockClientTransportListener);
650     ClientStream stream1 = client.newStream(methodDescriptor, new Metadata(), callOptions);
651     ClientStreamListenerBase clientStreamListener1 = new ClientStreamListenerBase();
652     stream1.start(clientStreamListener1);
653     verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportInUse(true);
654     ClientStream stream2 = client.newStream(methodDescriptor, new Metadata(), callOptions);
655     ClientStreamListenerBase clientStreamListener2 = new ClientStreamListenerBase();
656     stream2.start(clientStreamListener2);
657 
658     stream1.cancel(Status.CANCELLED);
659     verify(mockClientTransportListener, never()).transportInUse(false);
660     stream2.cancel(Status.CANCELLED);
661     verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportInUse(false);
662     // Verify that the callback has been called only once for true and false respectively
663     verify(mockClientTransportListener).transportInUse(true);
664     verify(mockClientTransportListener).transportInUse(false);
665   }
666 
667   @Test
668   @SuppressWarnings("deprecation")
basicStream()669   public void basicStream() throws Exception {
670     InOrder clientInOrder = inOrder(clientStreamTracerFactory);
671     InOrder serverInOrder = inOrder(serverStreamTracerFactory);
672     server.start(serverListener);
673     client = newClientTransport(server);
674     startTransport(client, mockClientTransportListener);
675     MockServerTransportListener serverTransportListener
676         = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
677     serverTransport = serverTransportListener.transport;
678 
679     Metadata clientHeaders = new Metadata();
680     clientHeaders.put(asciiKey, "client");
681     clientHeaders.put(asciiKey, "dupvalue");
682     clientHeaders.put(asciiKey, "dupvalue");
683     clientHeaders.put(binaryKey, "äbinaryclient");
684     Metadata clientHeadersCopy = new Metadata();
685 
686     clientHeadersCopy.merge(clientHeaders);
687     ClientStream clientStream = client.newStream(methodDescriptor, clientHeaders, callOptions);
688     clientInOrder.verify(clientStreamTracerFactory).newClientStreamTracer(
689         same(callOptions), same(clientHeaders));
690 
691     ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
692     clientStream.start(clientStreamListener);
693     StreamCreation serverStreamCreation
694         = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
695     assertTrue(clientStreamTracer1.awaitOutboundHeaders(TIMEOUT_MS, TimeUnit.MILLISECONDS));
696     assertEquals(methodDescriptor.getFullMethodName(), serverStreamCreation.method);
697     assertEquals(Lists.newArrayList(clientHeadersCopy.getAll(asciiKey)),
698         Lists.newArrayList(serverStreamCreation.headers.getAll(asciiKey)));
699     assertEquals(Lists.newArrayList(clientHeadersCopy.getAll(binaryKey)),
700         Lists.newArrayList(serverStreamCreation.headers.getAll(binaryKey)));
701     ServerStream serverStream = serverStreamCreation.stream;
702     ServerStreamListenerBase serverStreamListener = serverStreamCreation.listener;
703 
704     serverInOrder.verify(serverStreamTracerFactory).newServerStreamTracer(
705         eq(methodDescriptor.getFullMethodName()), any(Metadata.class));
706 
707     assertEquals("additional attribute value",
708         serverStream.getAttributes().get(ADDITIONAL_TRANSPORT_ATTR_KEY));
709     assertNotNull(serverStream.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR));
710     assertNotNull(serverStream.getAttributes().get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR));
711 
712     serverStream.request(1);
713     assertTrue(clientStreamListener.awaitOnReadyAndDrain(TIMEOUT_MS, TimeUnit.MILLISECONDS));
714     assertTrue(clientStream.isReady());
715     clientStream.writeMessage(methodDescriptor.streamRequest("Hello!"));
716     assertThat(clientStreamTracer1.nextOutboundEvent()).isEqualTo("outboundMessage(0)");
717 
718     clientStream.flush();
719     InputStream message = serverStreamListener.messageQueue.poll(TIMEOUT_MS, TimeUnit.MILLISECONDS);
720     assertEquals("Hello!", methodDescriptor.parseRequest(message));
721     message.close();
722     assertThat(clientStreamTracer1.nextOutboundEvent())
723         .matches("outboundMessageSent\\(0, -?[0-9]+, -?[0-9]+\\)");
724     if (sizesReported()) {
725       assertThat(clientStreamTracer1.getOutboundWireSize()).isGreaterThan(0L);
726       assertThat(clientStreamTracer1.getOutboundUncompressedSize()).isGreaterThan(0L);
727     } else {
728       assertThat(clientStreamTracer1.getOutboundWireSize()).isEqualTo(0L);
729       assertThat(clientStreamTracer1.getOutboundUncompressedSize()).isEqualTo(0L);
730     }
731     assertThat(serverStreamTracer1.nextInboundEvent()).isEqualTo("inboundMessage(0)");
732     assertNull("no additional message expected", serverStreamListener.messageQueue.poll());
733 
734     clientStream.halfClose();
735     assertTrue(serverStreamListener.awaitHalfClosed(TIMEOUT_MS, TimeUnit.MILLISECONDS));
736 
737     if (sizesReported()) {
738       assertThat(serverStreamTracer1.getInboundWireSize()).isGreaterThan(0L);
739       assertThat(serverStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L);
740     } else {
741       assertThat(serverStreamTracer1.getInboundWireSize()).isEqualTo(0L);
742       assertThat(serverStreamTracer1.getInboundUncompressedSize()).isEqualTo(0L);
743     }
744     assertThat(serverStreamTracer1.nextInboundEvent())
745         .matches("inboundMessageRead\\(0, -?[0-9]+, -?[0-9]+\\)");
746 
747     Metadata serverHeaders = new Metadata();
748     serverHeaders.put(asciiKey, "server");
749     serverHeaders.put(asciiKey, "dupvalue");
750     serverHeaders.put(asciiKey, "dupvalue");
751     serverHeaders.put(binaryKey, "äbinaryserver");
752     Metadata serverHeadersCopy = new Metadata();
753     serverHeadersCopy.merge(serverHeaders);
754     serverStream.writeHeaders(serverHeaders);
755     Metadata headers = clientStreamListener.headers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
756     assertNotNull(headers);
757     assertEquals(
758         Lists.newArrayList(serverHeadersCopy.getAll(asciiKey)),
759         Lists.newArrayList(headers.getAll(asciiKey)));
760     assertEquals(
761         Lists.newArrayList(serverHeadersCopy.getAll(binaryKey)),
762         Lists.newArrayList(headers.getAll(binaryKey)));
763 
764     clientStream.request(1);
765     assertTrue(serverStreamListener.awaitOnReadyAndDrain(TIMEOUT_MS, TimeUnit.MILLISECONDS));
766     assertTrue(serverStream.isReady());
767     serverStream.writeMessage(methodDescriptor.streamResponse("Hi. Who are you?"));
768     assertThat(serverStreamTracer1.nextOutboundEvent()).isEqualTo("outboundMessage(0)");
769 
770     serverStream.flush();
771     message = clientStreamListener.messageQueue.poll(TIMEOUT_MS, TimeUnit.MILLISECONDS);
772     assertNotNull("message expected", message);
773     assertThat(serverStreamTracer1.nextOutboundEvent())
774         .matches("outboundMessageSent\\(0, -?[0-9]+, -?[0-9]+\\)");
775     if (sizesReported()) {
776       assertThat(serverStreamTracer1.getOutboundWireSize()).isGreaterThan(0L);
777       assertThat(serverStreamTracer1.getOutboundUncompressedSize()).isGreaterThan(0L);
778     } else {
779       assertThat(serverStreamTracer1.getOutboundWireSize()).isEqualTo(0L);
780       assertThat(serverStreamTracer1.getOutboundUncompressedSize()).isEqualTo(0L);
781     }
782     assertTrue(clientStreamTracer1.getInboundHeaders());
783     assertThat(clientStreamTracer1.nextInboundEvent()).isEqualTo("inboundMessage(0)");
784     assertEquals("Hi. Who are you?", methodDescriptor.parseResponse(message));
785     assertThat(clientStreamTracer1.nextInboundEvent())
786         .matches("inboundMessageRead\\(0, -?[0-9]+, -?[0-9]+\\)");
787     if (sizesReported()) {
788       assertThat(clientStreamTracer1.getInboundWireSize()).isGreaterThan(0L);
789       assertThat(clientStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L);
790     } else {
791       assertThat(clientStreamTracer1.getInboundWireSize()).isEqualTo(0L);
792       assertThat(clientStreamTracer1.getInboundUncompressedSize()).isEqualTo(0L);
793     }
794 
795     message.close();
796     assertNull("no additional message expected", clientStreamListener.messageQueue.poll());
797 
798     Status status = Status.OK.withDescription("That was normal");
799     Metadata trailers = new Metadata();
800     trailers.put(asciiKey, "trailers");
801     trailers.put(asciiKey, "dupvalue");
802     trailers.put(asciiKey, "dupvalue");
803     trailers.put(binaryKey, "äbinarytrailers");
804     serverStream.close(status, trailers);
805     assertNull(serverStreamTracer1.nextInboundEvent());
806     assertNull(serverStreamTracer1.nextOutboundEvent());
807     assertCodeEquals(Status.OK, serverStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
808     assertSame(status, serverStreamTracer1.getStatus());
809     Status clientStreamStatus = clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
810     Metadata clientStreamTrailers =
811         clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
812     assertSame(clientStreamStatus, clientStreamTracer1.getStatus());
813     assertNull(clientStreamTracer1.nextInboundEvent());
814     assertNull(clientStreamTracer1.nextOutboundEvent());
815     assertEquals(status.getCode(), clientStreamStatus.getCode());
816     assertEquals(status.getDescription(), clientStreamStatus.getDescription());
817     assertEquals(
818         Lists.newArrayList(trailers.getAll(asciiKey)),
819         Lists.newArrayList(clientStreamTrailers.getAll(asciiKey)));
820     assertEquals(
821         Lists.newArrayList(trailers.getAll(binaryKey)),
822         Lists.newArrayList(clientStreamTrailers.getAll(binaryKey)));
823   }
824 
825   @Test
826   @SuppressWarnings("deprecation")
authorityPropagation()827   public void authorityPropagation() throws Exception {
828     server.start(serverListener);
829     client = newClientTransport(server);
830     startTransport(client, mockClientTransportListener);
831     MockServerTransportListener serverTransportListener
832             = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
833 
834     Metadata clientHeaders = new Metadata();
835     ClientStream clientStream = client.newStream(methodDescriptor, clientHeaders, callOptions);
836     ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
837     clientStream.start(clientStreamListener);
838     StreamCreation serverStreamCreation
839             = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
840     ServerStream serverStream = serverStreamCreation.stream;
841 
842     assertEquals(testAuthority(server), serverStream.getAuthority());
843   }
844 
845   @Test
zeroMessageStream()846   public void zeroMessageStream() throws Exception {
847     server.start(serverListener);
848     client = newClientTransport(server);
849     startTransport(client, mockClientTransportListener);
850     MockServerTransportListener serverTransportListener
851         = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
852     serverTransport = serverTransportListener.transport;
853 
854     ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
855     ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
856     clientStream.start(clientStreamListener);
857     StreamCreation serverStreamCreation
858         = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
859     ServerStream serverStream = serverStreamCreation.stream;
860     ServerStreamListenerBase serverStreamListener = serverStreamCreation.listener;
861 
862     clientStream.halfClose();
863     assertTrue(serverStreamListener.awaitHalfClosed(TIMEOUT_MS, TimeUnit.MILLISECONDS));
864 
865     serverStream.writeHeaders(new Metadata());
866     assertNotNull(clientStreamListener.headers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
867 
868     Status status = Status.OK.withDescription("Nice talking to you");
869     serverStream.close(status, new Metadata());
870     assertCodeEquals(Status.OK, serverStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
871     Status clientStreamStatus = clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
872     assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
873     assertEquals(status.getCode(), clientStreamStatus.getCode());
874     assertEquals(status.getDescription(), clientStreamStatus.getDescription());
875     assertTrue(clientStreamTracer1.getOutboundHeaders());
876     assertTrue(clientStreamTracer1.getInboundHeaders());
877     assertSame(clientStreamStatus, clientStreamTracer1.getStatus());
878     assertSame(status, serverStreamTracer1.getStatus());
879   }
880 
881   @Test
earlyServerClose_withServerHeaders()882   public void earlyServerClose_withServerHeaders() throws Exception {
883     server.start(serverListener);
884     client = newClientTransport(server);
885     startTransport(client, mockClientTransportListener);
886     MockServerTransportListener serverTransportListener
887         = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
888     serverTransport = serverTransportListener.transport;
889 
890     ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
891     ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
892     clientStream.start(clientStreamListener);
893     StreamCreation serverStreamCreation
894         = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
895     ServerStream serverStream = serverStreamCreation.stream;
896     ServerStreamListenerBase serverStreamListener = serverStreamCreation.listener;
897 
898     serverStream.writeHeaders(new Metadata());
899     assertNotNull(clientStreamListener.headers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
900 
901     Status strippedStatus = Status.OK.withDescription("Hello. Goodbye.");
902     Status status = strippedStatus.withCause(new Exception());
903     serverStream.close(status, new Metadata());
904     assertCodeEquals(Status.OK, serverStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
905     Status clientStreamStatus = clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
906     assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
907     assertEquals(status.getCode(), clientStreamStatus.getCode());
908     assertEquals("Hello. Goodbye.", clientStreamStatus.getDescription());
909     assertNull(clientStreamStatus.getCause());
910     assertTrue(clientStreamTracer1.getOutboundHeaders());
911     assertTrue(clientStreamTracer1.getInboundHeaders());
912     assertSame(clientStreamStatus, clientStreamTracer1.getStatus());
913     assertSame(status, serverStreamTracer1.getStatus());
914   }
915 
916   @Test
earlyServerClose_noServerHeaders()917   public void earlyServerClose_noServerHeaders() throws Exception {
918     server.start(serverListener);
919     client = newClientTransport(server);
920     startTransport(client, mockClientTransportListener);
921     MockServerTransportListener serverTransportListener
922         = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
923     serverTransport = serverTransportListener.transport;
924 
925     ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
926     ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
927     clientStream.start(clientStreamListener);
928     StreamCreation serverStreamCreation
929         = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
930     ServerStream serverStream = serverStreamCreation.stream;
931     ServerStreamListenerBase serverStreamListener = serverStreamCreation.listener;
932 
933     Status strippedStatus = Status.OK.withDescription("Hellogoodbye");
934     Status status = strippedStatus.withCause(new Exception());
935     Metadata trailers = new Metadata();
936     trailers.put(asciiKey, "trailers");
937     trailers.put(asciiKey, "dupvalue");
938     trailers.put(asciiKey, "dupvalue");
939     trailers.put(binaryKey, "äbinarytrailers");
940     serverStream.close(status, trailers);
941     assertCodeEquals(Status.OK, serverStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
942     Status clientStreamStatus = clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
943     Metadata clientStreamTrailers =
944         clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
945     assertEquals(status.getCode(), clientStreamStatus.getCode());
946     assertEquals("Hellogoodbye", clientStreamStatus.getDescription());
947     // Cause should not be transmitted to the client.
948     assertNull(clientStreamStatus.getCause());
949     assertEquals(
950         Lists.newArrayList(trailers.getAll(asciiKey)),
951         Lists.newArrayList(clientStreamTrailers.getAll(asciiKey)));
952     assertEquals(
953         Lists.newArrayList(trailers.getAll(binaryKey)),
954         Lists.newArrayList(clientStreamTrailers.getAll(binaryKey)));
955     assertTrue(clientStreamTracer1.getOutboundHeaders());
956     assertSame(clientStreamStatus, clientStreamTracer1.getStatus());
957     assertSame(status, serverStreamTracer1.getStatus());
958   }
959 
960   @Test
earlyServerClose_serverFailure()961   public void earlyServerClose_serverFailure() throws Exception {
962     server.start(serverListener);
963     client = newClientTransport(server);
964     startTransport(client, mockClientTransportListener);
965     MockServerTransportListener serverTransportListener
966         = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
967     serverTransport = serverTransportListener.transport;
968 
969     ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
970     ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
971     clientStream.start(clientStreamListener);
972     StreamCreation serverStreamCreation
973         = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
974     ServerStream serverStream = serverStreamCreation.stream;
975     ServerStreamListenerBase serverStreamListener = serverStreamCreation.listener;
976 
977     Status strippedStatus = Status.INTERNAL.withDescription("I'm not listening");
978     Status status = strippedStatus.withCause(new Exception());
979     serverStream.close(status, new Metadata());
980     assertCodeEquals(Status.OK, serverStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
981     Status clientStreamStatus = clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
982     assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
983     assertEquals(status.getCode(), clientStreamStatus.getCode());
984     assertEquals(status.getDescription(), clientStreamStatus.getDescription());
985     assertNull(clientStreamStatus.getCause());
986     assertTrue(clientStreamTracer1.getOutboundHeaders());
987     assertSame(clientStreamStatus, clientStreamTracer1.getStatus());
988     assertSame(status, serverStreamTracer1.getStatus());
989   }
990 
991   @Test
earlyServerClose_serverFailure_withClientCancelOnListenerClosed()992   public void earlyServerClose_serverFailure_withClientCancelOnListenerClosed() throws Exception {
993     server.start(serverListener);
994     client = newClientTransport(server);
995     runIfNotNull(client.start(mockClientTransportListener));
996     MockServerTransportListener serverTransportListener
997         = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
998     serverTransport = serverTransportListener.transport;
999 
1000     final ClientStream clientStream =
1001         client.newStream(methodDescriptor, new Metadata(), callOptions);
1002     ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase() {
1003       @Override
1004       public void closed(Status status, Metadata trailers) {
1005         super.closed(status, trailers);
1006         // This simulates the blocking calls which can trigger clientStream.cancel().
1007         clientStream.cancel(Status.CANCELLED.withCause(status.asRuntimeException()));
1008       }
1009 
1010       @Override
1011       public void closed(
1012           Status status, RpcProgress rpcProgress, Metadata trailers) {
1013         super.closed(status, rpcProgress, trailers);
1014         // This simulates the blocking calls which can trigger clientStream.cancel().
1015         clientStream.cancel(Status.CANCELLED.withCause(status.asRuntimeException()));
1016       }
1017     };
1018     clientStream.start(clientStreamListener);
1019     StreamCreation serverStreamCreation
1020         = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
1021     ServerStream serverStream = serverStreamCreation.stream;
1022     ServerStreamListenerBase serverStreamListener = serverStreamCreation.listener;
1023 
1024     Status strippedStatus = Status.INTERNAL.withDescription("I'm not listening");
1025     Status status = strippedStatus.withCause(new Exception());
1026     serverStream.close(status, new Metadata());
1027     assertCodeEquals(Status.OK, serverStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
1028     Status clientStreamStatus = clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
1029     assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
1030     assertEquals(status.getCode(), clientStreamStatus.getCode());
1031     assertEquals(status.getDescription(), clientStreamStatus.getDescription());
1032     assertNull(clientStreamStatus.getCause());
1033     assertTrue(clientStreamTracer1.getOutboundHeaders());
1034     assertSame(clientStreamStatus, clientStreamTracer1.getStatus());
1035     assertSame(status, serverStreamTracer1.getStatus());
1036   }
1037 
1038   @Test
clientCancel()1039   public void clientCancel() throws Exception {
1040     server.start(serverListener);
1041     client = newClientTransport(server);
1042     startTransport(client, mockClientTransportListener);
1043     MockServerTransportListener serverTransportListener
1044         = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
1045     serverTransport = serverTransportListener.transport;
1046 
1047     ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
1048     ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
1049     clientStream.start(clientStreamListener);
1050     StreamCreation serverStreamCreation
1051         = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
1052     ServerStreamListenerBase serverStreamListener = serverStreamCreation.listener;
1053 
1054     Status status = Status.CANCELLED.withDescription("Nevermind").withCause(new Exception());
1055     clientStream.cancel(status);
1056     assertEquals(status, clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
1057     assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
1058     Status serverStatus = serverStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
1059     assertNotEquals(Status.Code.OK, serverStatus.getCode());
1060     // Cause should not be transmitted between client and server
1061     assertNull(serverStatus.getCause());
1062 
1063     clientStream.cancel(status);
1064     assertTrue(clientStreamTracer1.getOutboundHeaders());
1065     assertSame(status, clientStreamTracer1.getStatus());
1066     assertSame(serverStatus, serverStreamTracer1.getStatus());
1067   }
1068 
1069   @Test
clientCancelFromWithinMessageRead()1070   public void clientCancelFromWithinMessageRead() throws Exception {
1071     server.start(serverListener);
1072     client = newClientTransport(server);
1073     startTransport(client, mockClientTransportListener);
1074     MockServerTransportListener serverTransportListener
1075         = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
1076     serverTransport = serverTransportListener.transport;
1077 
1078     final SettableFuture<Boolean> closedCalled = SettableFuture.create();
1079     final ClientStream clientStream =
1080         client.newStream(methodDescriptor, new Metadata(), callOptions);
1081     final Status status = Status.CANCELLED.withDescription("nevermind");
1082     clientStream.start(new ClientStreamListener() {
1083       private boolean messageReceived = false;
1084 
1085       @Override
1086       public void headersRead(Metadata headers) {
1087       }
1088 
1089       @Override
1090       public void closed(Status status, Metadata trailers) {
1091         closed(status, RpcProgress.PROCESSED, trailers);
1092       }
1093 
1094       @Override
1095       public void closed(
1096           Status status, RpcProgress rpcProgress, Metadata trailers) {
1097         assertEquals(Status.CANCELLED.getCode(), status.getCode());
1098         assertEquals("nevermind", status.getDescription());
1099         closedCalled.set(true);
1100       }
1101 
1102       @Override
1103       public void messagesAvailable(MessageProducer producer) {
1104         InputStream message;
1105         while ((message = producer.next()) != null) {
1106           assertFalse("too many messages received", messageReceived);
1107           messageReceived = true;
1108           assertEquals("foo", methodDescriptor.parseResponse(message));
1109           clientStream.cancel(status);
1110         }
1111       }
1112 
1113       @Override
1114       public void onReady() {
1115       }
1116     });
1117     clientStream.halfClose();
1118     clientStream.request(1);
1119 
1120     StreamCreation serverStreamCreation
1121         = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
1122     assertEquals(methodDescriptor.getFullMethodName(), serverStreamCreation.method);
1123     ServerStream serverStream = serverStreamCreation.stream;
1124     ServerStreamListenerBase serverStreamListener = serverStreamCreation.listener;
1125     assertTrue(serverStreamListener.awaitOnReadyAndDrain(TIMEOUT_MS, TimeUnit.MILLISECONDS));
1126 
1127     assertTrue(serverStream.isReady());
1128     serverStream.writeHeaders(new Metadata());
1129     serverStream.writeMessage(methodDescriptor.streamRequest("foo"));
1130     serverStream.flush();
1131 
1132     // Block until closedCalled was set.
1133     closedCalled.get(5, TimeUnit.SECONDS);
1134 
1135     serverStream.close(Status.OK, new Metadata());
1136     assertTrue(clientStreamTracer1.getOutboundHeaders());
1137     assertTrue(clientStreamTracer1.getInboundHeaders());
1138     if (sizesReported()) {
1139       assertThat(clientStreamTracer1.getInboundWireSize()).isGreaterThan(0L);
1140       assertThat(clientStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L);
1141       assertThat(serverStreamTracer1.getOutboundWireSize()).isGreaterThan(0L);
1142       assertThat(serverStreamTracer1.getOutboundUncompressedSize()).isGreaterThan(0L);
1143     } else {
1144       assertThat(clientStreamTracer1.getInboundWireSize()).isEqualTo(0L);
1145       assertThat(clientStreamTracer1.getInboundUncompressedSize()).isEqualTo(0L);
1146       assertThat(serverStreamTracer1.getOutboundWireSize()).isEqualTo(0L);
1147       assertThat(serverStreamTracer1.getOutboundUncompressedSize()).isEqualTo(0L);
1148     }
1149     assertSame(status, clientStreamTracer1.getStatus());
1150     // There is a race between client cancelling and server closing.  The final status seen by the
1151     // server is non-deterministic.
1152     assertTrue(serverStreamTracer1.await(TIMEOUT_MS, TimeUnit.MILLISECONDS));
1153     assertNotNull(serverStreamTracer1.getStatus());
1154   }
1155 
1156   @Test
serverCancel()1157   public void serverCancel() throws Exception {
1158     server.start(serverListener);
1159     client = newClientTransport(server);
1160     startTransport(client, mockClientTransportListener);
1161     MockServerTransportListener serverTransportListener
1162         = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
1163     serverTransport = serverTransportListener.transport;
1164 
1165     ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
1166     ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
1167     clientStream.start(clientStreamListener);
1168     StreamCreation serverStreamCreation
1169         = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
1170     ServerStream serverStream = serverStreamCreation.stream;
1171     ServerStreamListenerBase serverStreamListener = serverStreamCreation.listener;
1172 
1173     Status status = Status.DEADLINE_EXCEEDED.withDescription("It was bound to happen")
1174         .withCause(new Exception());
1175     serverStream.cancel(status);
1176     assertEquals(status, serverStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
1177     Status clientStreamStatus = clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
1178     assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
1179     // Presently we can't sent much back to the client in this case. Verify that is the current
1180     // behavior for consistency between transports.
1181     assertCodeEquals(Status.CANCELLED, clientStreamStatus);
1182     // Cause should not be transmitted between server and client
1183     assertNull(clientStreamStatus.getCause());
1184 
1185     verify(clientStreamTracerFactory).newClientStreamTracer(
1186         any(CallOptions.class), any(Metadata.class));
1187     assertTrue(clientStreamTracer1.getOutboundHeaders());
1188     assertSame(clientStreamStatus, clientStreamTracer1.getStatus());
1189     verify(serverStreamTracerFactory).newServerStreamTracer(anyString(), any(Metadata.class));
1190     assertSame(status, serverStreamTracer1.getStatus());
1191 
1192     // Second cancellation shouldn't trigger additional callbacks
1193     serverStream.cancel(status);
1194     doPingPong(serverListener);
1195   }
1196 
1197   @Test
flowControlPushBack()1198   public void flowControlPushBack() throws Exception {
1199     // This test tries to create more streams than the number of distinctive stream tracers that the
1200     // mock factory will return.  This causes the last stream tracer to be returned for more than
1201     // one streams, resulting in duplicate callbacks.  Since we don't care the stream tracers in
1202     // this test, we just disable the check.
1203     clientStreamTracer2.setFailDuplicateCallbacks(false);
1204     serverStreamTracer2.setFailDuplicateCallbacks(false);
1205 
1206     server.start(serverListener);
1207     client = newClientTransport(server);
1208     startTransport(client, mockClientTransportListener);
1209     MockServerTransportListener serverTransportListener =
1210         serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
1211     serverTransport = serverTransportListener.transport;
1212 
1213     ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
1214     ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
1215     clientStream.start(clientStreamListener);
1216     StreamCreation serverStreamCreation =
1217         serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
1218     assertEquals(methodDescriptor.getFullMethodName(), serverStreamCreation.method);
1219     ServerStream serverStream = serverStreamCreation.stream;
1220     ServerStreamListenerBase serverStreamListener = serverStreamCreation.listener;
1221 
1222     serverStream.writeHeaders(new Metadata());
1223 
1224     String largeMessage;
1225     {
1226       int size = 1 * 1024;
1227       StringBuilder sb = new StringBuilder(size);
1228       for (int i = 0; i < size; i++) {
1229         sb.append('a');
1230       }
1231       largeMessage = sb.toString();
1232     }
1233 
1234     serverStream.request(1);
1235     assertTrue(clientStreamListener.awaitOnReadyAndDrain(TIMEOUT_MS, TimeUnit.MILLISECONDS));
1236     assertTrue(clientStream.isReady());
1237     final int maxToSend = 10 * 1024;
1238     int clientSent;
1239     // Verify that flow control will push back on client.
1240     for (clientSent = 0; clientStream.isReady(); clientSent++) {
1241       if (clientSent > maxToSend) {
1242         // It seems like flow control isn't working. _Surely_ flow control would have pushed-back
1243         // already. If this is normal, please configure the transport to buffer less.
1244         fail("Too many messages sent before isReady() returned false");
1245       }
1246       clientStream.writeMessage(methodDescriptor.streamRequest(largeMessage));
1247       clientStream.flush();
1248     }
1249     assertTrue(clientSent > 0);
1250     // Make sure there are at least a few messages buffered.
1251     for (; clientSent < 5; clientSent++) {
1252       clientStream.writeMessage(methodDescriptor.streamResponse(largeMessage));
1253       clientStream.flush();
1254     }
1255     doPingPong(serverListener);
1256 
1257     int serverReceived = verifyMessageCountAndClose(serverStreamListener.messageQueue, 1);
1258 
1259     clientStream.request(1);
1260     assertTrue(serverStreamListener.awaitOnReadyAndDrain(TIMEOUT_MS, TimeUnit.MILLISECONDS));
1261     assertTrue(serverStream.isReady());
1262     int serverSent;
1263     // Verify that flow control will push back on server.
1264     for (serverSent = 0; serverStream.isReady(); serverSent++) {
1265       if (serverSent > maxToSend) {
1266         // It seems like flow control isn't working. _Surely_ flow control would have pushed-back
1267         // already. If this is normal, please configure the transport to buffer less.
1268         fail("Too many messages sent before isReady() returned false");
1269       }
1270       serverStream.writeMessage(methodDescriptor.streamResponse(largeMessage));
1271       serverStream.flush();
1272     }
1273     assertTrue(serverSent > 0);
1274     // Make sure there are at least a few messages buffered.
1275     for (; serverSent < 5; serverSent++) {
1276       serverStream.writeMessage(methodDescriptor.streamResponse(largeMessage));
1277       serverStream.flush();
1278     }
1279     doPingPong(serverListener);
1280 
1281     int clientReceived = verifyMessageCountAndClose(clientStreamListener.messageQueue, 1);
1282 
1283     serverStream.request(3);
1284     clientStream.request(3);
1285     doPingPong(serverListener);
1286     clientReceived += verifyMessageCountAndClose(clientStreamListener.messageQueue, 3);
1287     serverReceived += verifyMessageCountAndClose(serverStreamListener.messageQueue, 3);
1288 
1289     // Request the rest
1290     serverStream.request(clientSent);
1291     clientStream.request(serverSent);
1292     clientReceived +=
1293         verifyMessageCountAndClose(clientStreamListener.messageQueue, serverSent - clientReceived);
1294     serverReceived +=
1295         verifyMessageCountAndClose(serverStreamListener.messageQueue, clientSent - serverReceived);
1296 
1297     assertTrue(clientStreamListener.awaitOnReadyAndDrain(TIMEOUT_MS, TimeUnit.MILLISECONDS));
1298     assertTrue(clientStream.isReady());
1299     assertTrue(serverStreamListener.awaitOnReadyAndDrain(TIMEOUT_MS, TimeUnit.MILLISECONDS));
1300     assertTrue(serverStream.isReady());
1301 
1302     // Request four more
1303     for (int i = 0; i < 5; i++) {
1304       clientStream.writeMessage(methodDescriptor.streamRequest(largeMessage));
1305       clientStream.flush();
1306       serverStream.writeMessage(methodDescriptor.streamResponse(largeMessage));
1307       serverStream.flush();
1308     }
1309     doPingPong(serverListener);
1310     clientReceived += verifyMessageCountAndClose(clientStreamListener.messageQueue, 4);
1311     serverReceived += verifyMessageCountAndClose(serverStreamListener.messageQueue, 4);
1312 
1313     // Drain exactly how many messages are left
1314     serverStream.request(1);
1315     clientStream.request(1);
1316     clientReceived += verifyMessageCountAndClose(clientStreamListener.messageQueue, 1);
1317     serverReceived += verifyMessageCountAndClose(serverStreamListener.messageQueue, 1);
1318 
1319     // And now check that the streams can still complete gracefully
1320     clientStream.writeMessage(methodDescriptor.streamRequest(largeMessage));
1321     clientStream.flush();
1322     clientStream.halfClose();
1323     doPingPong(serverListener);
1324     assertFalse(serverStreamListener.awaitHalfClosed(TIMEOUT_MS, TimeUnit.MILLISECONDS));
1325 
1326     serverStream.request(1);
1327     serverReceived += verifyMessageCountAndClose(serverStreamListener.messageQueue, 1);
1328     assertEquals(clientSent + 6, serverReceived);
1329     assertTrue(serverStreamListener.awaitHalfClosed(TIMEOUT_MS, TimeUnit.MILLISECONDS));
1330 
1331     serverStream.writeMessage(methodDescriptor.streamResponse(largeMessage));
1332     serverStream.flush();
1333     Status status = Status.OK.withDescription("... quite a lengthy discussion");
1334     serverStream.close(status, new Metadata());
1335     doPingPong(serverListener);
1336     try {
1337       clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
1338       fail("Expected TimeoutException");
1339     } catch (TimeoutException expectedException) {
1340     }
1341 
1342     clientStream.request(1);
1343     clientReceived += verifyMessageCountAndClose(clientStreamListener.messageQueue, 1);
1344     assertEquals(serverSent + 6, clientReceived);
1345     assertCodeEquals(Status.OK, serverStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
1346     Status clientStreamStatus = clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
1347     assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
1348     assertEquals(status.getCode(), clientStreamStatus.getCode());
1349     assertEquals(status.getDescription(), clientStreamStatus.getDescription());
1350   }
1351 
verifyMessageCountAndClose(BlockingQueue<InputStream> messageQueue, int count)1352   private int verifyMessageCountAndClose(BlockingQueue<InputStream> messageQueue, int count)
1353       throws Exception {
1354     InputStream message;
1355     for (int i = 0; i < count; i++) {
1356       message = messageQueue.poll(TIMEOUT_MS, TimeUnit.MILLISECONDS);
1357       assertNotNull(message);
1358       message.close();
1359     }
1360     assertNull("no additional message expected", messageQueue.poll());
1361     return count;
1362   }
1363 
1364   @Test
interactionsAfterServerStreamCloseAreNoops()1365   public void interactionsAfterServerStreamCloseAreNoops() throws Exception {
1366     server.start(serverListener);
1367     client = newClientTransport(server);
1368     startTransport(client, mockClientTransportListener);
1369     MockServerTransportListener serverTransportListener
1370         = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
1371     serverTransport = serverTransportListener.transport;
1372 
1373     // boilerplate
1374     ClientStream clientStream =
1375         client.newStream(methodDescriptor, new Metadata(), callOptions);
1376     ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
1377     clientStream.start(clientStreamListener);
1378     StreamCreation server
1379         = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
1380 
1381     // setup
1382     clientStream.request(1);
1383     server.stream.close(Status.INTERNAL, new Metadata());
1384     assertNotNull(clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
1385     assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
1386 
1387     // Ensure that for a closed ServerStream, interactions are noops
1388     server.stream.writeHeaders(new Metadata());
1389     server.stream.writeMessage(methodDescriptor.streamResponse("response"));
1390     server.stream.close(Status.INTERNAL, new Metadata());
1391 
1392     // Make sure new streams still work properly
1393     doPingPong(serverListener);
1394   }
1395 
1396   @Test
interactionsAfterClientStreamCancelAreNoops()1397   public void interactionsAfterClientStreamCancelAreNoops() throws Exception {
1398     server.start(serverListener);
1399     client = newClientTransport(server);
1400     startTransport(client, mockClientTransportListener);
1401     MockServerTransportListener serverTransportListener
1402         = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
1403     serverTransport = serverTransportListener.transport;
1404 
1405     // boilerplate
1406     ClientStream clientStream =
1407         client.newStream(methodDescriptor, new Metadata(), callOptions);
1408     ClientStreamListener clientListener = mock(ClientStreamListener.class);
1409     clientStream.start(clientListener);
1410     StreamCreation server
1411         = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
1412 
1413     // setup
1414     server.stream.request(1);
1415     clientStream.cancel(Status.UNKNOWN);
1416     assertNotNull(server.listener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
1417 
1418     // Ensure that for a cancelled ClientStream, interactions are noops
1419     clientStream.writeMessage(methodDescriptor.streamRequest("request"));
1420     clientStream.halfClose();
1421     clientStream.cancel(Status.UNKNOWN);
1422 
1423     // Make sure new streams still work properly
1424     doPingPong(serverListener);
1425   }
1426 
1427   // Not all transports support the tracer yet
haveTransportTracer()1428   protected boolean haveTransportTracer() {
1429     return false;
1430   }
1431 
1432   @Test
transportTracer_streamStarted()1433   public void transportTracer_streamStarted() throws Exception {
1434     server.start(serverListener);
1435     client = newClientTransport(server);
1436     startTransport(client, mockClientTransportListener);
1437     MockServerTransportListener serverTransportListener
1438         = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
1439     if (!haveTransportTracer()) {
1440       return;
1441     }
1442 
1443     // start first stream
1444     long serverFirstTimestampNanos;
1445     long clientFirstTimestampNanos;
1446     {
1447       TransportStats serverBefore = getTransportStats(serverTransportListener.transport);
1448       assertEquals(0, serverBefore.streamsStarted);
1449       assertEquals(0, serverBefore.lastRemoteStreamCreatedTimeNanos);
1450       TransportStats clientBefore = getTransportStats(client);
1451       assertEquals(0, clientBefore.streamsStarted);
1452       assertEquals(0, clientBefore.lastRemoteStreamCreatedTimeNanos);
1453 
1454       ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
1455       ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
1456       clientStream.start(clientStreamListener);
1457       StreamCreation serverStreamCreation = serverTransportListener
1458           .takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
1459 
1460       TransportStats serverAfter = getTransportStats(serverTransportListener.transport);
1461       assertEquals(1, serverAfter.streamsStarted);
1462       serverFirstTimestampNanos = serverAfter.lastRemoteStreamCreatedTimeNanos;
1463       assertEquals(fakeCurrentTimeNanos(), serverAfter.lastRemoteStreamCreatedTimeNanos);
1464 
1465       TransportStats clientAfter = getTransportStats(client);
1466       assertEquals(1, clientAfter.streamsStarted);
1467       clientFirstTimestampNanos = clientAfter.lastLocalStreamCreatedTimeNanos;
1468       assertEquals(fakeCurrentTimeNanos(), clientFirstTimestampNanos);
1469 
1470       ServerStream serverStream = serverStreamCreation.stream;
1471       serverStream.close(Status.OK, new Metadata());
1472     }
1473 
1474     final long elapsedMillis = 100;
1475     advanceClock(100, TimeUnit.MILLISECONDS);
1476 
1477     // start second stream
1478     {
1479       TransportStats serverBefore = getTransportStats(serverTransportListener.transport);
1480       assertEquals(1, serverBefore.streamsStarted);
1481       TransportStats clientBefore = getTransportStats(client);
1482       assertEquals(1, clientBefore.streamsStarted);
1483 
1484       ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
1485       ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
1486       clientStream.start(clientStreamListener);
1487       StreamCreation serverStreamCreation = serverTransportListener
1488           .takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
1489 
1490       TransportStats serverAfter = getTransportStats(serverTransportListener.transport);
1491       assertEquals(2, serverAfter.streamsStarted);
1492       assertEquals(
1493           TimeUnit.MILLISECONDS.toNanos(elapsedMillis),
1494           serverAfter.lastRemoteStreamCreatedTimeNanos - serverFirstTimestampNanos);
1495       assertEquals(fakeCurrentTimeNanos(), serverAfter.lastRemoteStreamCreatedTimeNanos);
1496 
1497       TransportStats clientAfter = getTransportStats(client);
1498       assertEquals(2, clientAfter.streamsStarted);
1499       assertEquals(
1500           TimeUnit.MILLISECONDS.toNanos(elapsedMillis),
1501           clientAfter.lastLocalStreamCreatedTimeNanos - clientFirstTimestampNanos);
1502       assertEquals(fakeCurrentTimeNanos(), clientAfter.lastLocalStreamCreatedTimeNanos);
1503 
1504       ServerStream serverStream = serverStreamCreation.stream;
1505       serverStream.close(Status.OK, new Metadata());
1506     }
1507   }
1508 
1509   @Test
transportTracer_server_streamEnded_ok()1510   public void transportTracer_server_streamEnded_ok() throws Exception {
1511     server.start(serverListener);
1512     client = newClientTransport(server);
1513     startTransport(client, mockClientTransportListener);
1514     ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
1515     ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
1516     clientStream.start(clientStreamListener);
1517     MockServerTransportListener serverTransportListener
1518         = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
1519     StreamCreation serverStreamCreation
1520         = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
1521     ServerStream serverStream = serverStreamCreation.stream;
1522     if (!haveTransportTracer()) {
1523       return;
1524     }
1525 
1526     TransportStats serverBefore = getTransportStats(serverTransportListener.transport);
1527     assertEquals(0, serverBefore.streamsSucceeded);
1528     assertEquals(0, serverBefore.streamsFailed);
1529     TransportStats clientBefore = getTransportStats(client);
1530     assertEquals(0, clientBefore.streamsSucceeded);
1531     assertEquals(0, clientBefore.streamsFailed);
1532 
1533     clientStream.halfClose();
1534     serverStream.close(Status.OK, new Metadata());
1535     // do not validate stats until close() has been called on client
1536     assertNotNull(clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
1537     assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
1538 
1539 
1540     TransportStats serverAfter = getTransportStats(serverTransportListener.transport);
1541     assertEquals(1, serverAfter.streamsSucceeded);
1542     assertEquals(0, serverAfter.streamsFailed);
1543     TransportStats clientAfter = getTransportStats(client);
1544     assertEquals(1, clientAfter.streamsSucceeded);
1545     assertEquals(0, clientAfter.streamsFailed);
1546   }
1547 
1548   @Test
transportTracer_server_streamEnded_nonOk()1549   public void transportTracer_server_streamEnded_nonOk() throws Exception {
1550     server.start(serverListener);
1551     client = newClientTransport(server);
1552     startTransport(client, mockClientTransportListener);
1553     ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
1554     ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
1555     clientStream.start(clientStreamListener);
1556     MockServerTransportListener serverTransportListener
1557         = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
1558     StreamCreation serverStreamCreation
1559         = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
1560     ServerStream serverStream = serverStreamCreation.stream;
1561     if (!haveTransportTracer()) {
1562       return;
1563     }
1564 
1565     TransportStats serverBefore = getTransportStats(serverTransportListener.transport);
1566     assertEquals(0, serverBefore.streamsFailed);
1567     assertEquals(0, serverBefore.streamsSucceeded);
1568     TransportStats clientBefore = getTransportStats(client);
1569     assertEquals(0, clientBefore.streamsFailed);
1570     assertEquals(0, clientBefore.streamsSucceeded);
1571 
1572     serverStream.close(Status.UNKNOWN, new Metadata());
1573     // do not validate stats until close() has been called on client
1574     assertNotNull(clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
1575     assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
1576 
1577 
1578     TransportStats serverAfter = getTransportStats(serverTransportListener.transport);
1579     assertEquals(1, serverAfter.streamsFailed);
1580     assertEquals(0, serverAfter.streamsSucceeded);
1581     TransportStats clientAfter = getTransportStats(client);
1582     assertEquals(1, clientAfter.streamsFailed);
1583     assertEquals(0, clientAfter.streamsSucceeded);
1584 
1585     client.shutdown(Status.UNAVAILABLE);
1586   }
1587 
1588   @Test
transportTracer_client_streamEnded_nonOk()1589   public void transportTracer_client_streamEnded_nonOk() throws Exception {
1590     server.start(serverListener);
1591     client = newClientTransport(server);
1592     startTransport(client, mockClientTransportListener);
1593     ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
1594     ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
1595     clientStream.start(clientStreamListener);
1596     MockServerTransportListener serverTransportListener =
1597         serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
1598     StreamCreation serverStreamCreation =
1599         serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
1600     if (!haveTransportTracer()) {
1601       return;
1602     }
1603 
1604     TransportStats serverBefore = getTransportStats(serverTransportListener.transport);
1605     assertEquals(0, serverBefore.streamsFailed);
1606     assertEquals(0, serverBefore.streamsSucceeded);
1607     TransportStats clientBefore = getTransportStats(client);
1608     assertEquals(0, clientBefore.streamsFailed);
1609     assertEquals(0, clientBefore.streamsSucceeded);
1610 
1611     clientStream.cancel(Status.UNKNOWN);
1612     // do not validate stats until close() has been called on server
1613     assertNotNull(serverStreamCreation.listener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
1614 
1615     TransportStats serverAfter = getTransportStats(serverTransportListener.transport);
1616     assertEquals(1, serverAfter.streamsFailed);
1617     assertEquals(0, serverAfter.streamsSucceeded);
1618     TransportStats clientAfter = getTransportStats(client);
1619     assertEquals(1, clientAfter.streamsFailed);
1620     assertEquals(0, clientAfter.streamsSucceeded);
1621   }
1622 
1623   @Test
transportTracer_server_receive_msg()1624   public void transportTracer_server_receive_msg() throws Exception {
1625     server.start(serverListener);
1626     client = newClientTransport(server);
1627     startTransport(client, mockClientTransportListener);
1628     ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
1629     ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
1630     clientStream.start(clientStreamListener);
1631     MockServerTransportListener serverTransportListener
1632         = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
1633     StreamCreation serverStreamCreation
1634         = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
1635     ServerStream serverStream = serverStreamCreation.stream;
1636     ServerStreamListenerBase serverStreamListener = serverStreamCreation.listener;
1637     if (!haveTransportTracer()) {
1638       return;
1639     }
1640 
1641     TransportStats serverBefore = getTransportStats(serverTransportListener.transport);
1642     assertEquals(0, serverBefore.messagesReceived);
1643     assertEquals(0, serverBefore.lastMessageReceivedTimeNanos);
1644     TransportStats clientBefore = getTransportStats(client);
1645     assertEquals(0, clientBefore.messagesSent);
1646     assertEquals(0, clientBefore.lastMessageSentTimeNanos);
1647 
1648     serverStream.request(1);
1649     clientStream.writeMessage(methodDescriptor.streamRequest("request"));
1650     clientStream.flush();
1651     clientStream.halfClose();
1652     verifyMessageCountAndClose(serverStreamListener.messageQueue, 1);
1653 
1654     TransportStats serverAfter = getTransportStats(serverTransportListener.transport);
1655     assertEquals(1, serverAfter.messagesReceived);
1656     assertEquals(fakeCurrentTimeNanos(), serverAfter.lastMessageReceivedTimeNanos);
1657     TransportStats clientAfter = getTransportStats(client);
1658     assertEquals(1, clientAfter.messagesSent);
1659     assertEquals(fakeCurrentTimeNanos(), clientAfter.lastMessageSentTimeNanos);
1660 
1661     serverStream.close(Status.OK, new Metadata());
1662   }
1663 
1664   @Test
transportTracer_server_send_msg()1665   public void transportTracer_server_send_msg() throws Exception {
1666     server.start(serverListener);
1667     client = newClientTransport(server);
1668     startTransport(client, mockClientTransportListener);
1669     ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
1670     ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
1671     clientStream.start(clientStreamListener);
1672     MockServerTransportListener serverTransportListener
1673         = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
1674     StreamCreation serverStreamCreation
1675         = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
1676     ServerStream serverStream = serverStreamCreation.stream;
1677     if (!haveTransportTracer()) {
1678       return;
1679     }
1680 
1681     TransportStats serverBefore = getTransportStats(serverTransportListener.transport);
1682     assertEquals(0, serverBefore.messagesSent);
1683     assertEquals(0, serverBefore.lastMessageSentTimeNanos);
1684     TransportStats clientBefore = getTransportStats(client);
1685     assertEquals(0, clientBefore.messagesReceived);
1686     assertEquals(0, clientBefore.lastMessageReceivedTimeNanos);
1687 
1688     clientStream.request(1);
1689     serverStream.writeHeaders(new Metadata());
1690     serverStream.writeMessage(methodDescriptor.streamResponse("response"));
1691     serverStream.flush();
1692     verifyMessageCountAndClose(clientStreamListener.messageQueue, 1);
1693 
1694     TransportStats serverAfter = getTransportStats(serverTransportListener.transport);
1695     assertEquals(1, serverAfter.messagesSent);
1696     assertEquals(fakeCurrentTimeNanos(), serverAfter.lastMessageSentTimeNanos);
1697     TransportStats clientAfter = getTransportStats(client);
1698     assertEquals(1, clientAfter.messagesReceived);
1699     assertEquals(fakeCurrentTimeNanos(), clientAfter.lastMessageReceivedTimeNanos);
1700 
1701     serverStream.close(Status.OK, new Metadata());
1702   }
1703 
1704   @Test
socketStats()1705   public void socketStats() throws Exception {
1706     server.start(serverListener);
1707     ManagedClientTransport client = newClientTransport(server);
1708     startTransport(client, mockClientTransportListener);
1709     ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
1710     ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
1711     clientStream.start(clientStreamListener);
1712 
1713     MockServerTransportListener serverTransportListener
1714         = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
1715     StreamCreation serverStreamCreation
1716         = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
1717     ServerStream serverStream = serverStreamCreation.stream;
1718 
1719     SocketAddress serverAddress = clientStream.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
1720     SocketAddress clientAddress = serverStream.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
1721 
1722     SocketStats clientSocketStats = client.getStats().get();
1723     assertEquals(clientAddress, clientSocketStats.local);
1724     assertEquals(serverAddress, clientSocketStats.remote);
1725     // very basic sanity check that socket options are populated
1726     assertNotNull(clientSocketStats.socketOptions.lingerSeconds);
1727     assertTrue(clientSocketStats.socketOptions.others.containsKey("SO_SNDBUF"));
1728 
1729     SocketStats serverSocketStats = serverTransportListener.transport.getStats().get();
1730     assertEquals(serverAddress, serverSocketStats.local);
1731     assertEquals(clientAddress, serverSocketStats.remote);
1732     // very basic sanity check that socket options are populated
1733     assertNotNull(serverSocketStats.socketOptions.lingerSeconds);
1734     assertTrue(serverSocketStats.socketOptions.others.containsKey("SO_SNDBUF"));
1735   }
1736 
1737   /**
1738    * Helper that simply does an RPC. It can be used similar to a sleep for negative testing: to give
1739    * time for actions _not_ to happen. Since it is based on doing an actual RPC with actual
1740    * callbacks, it generally provides plenty of time for Runnables to execute. But it is also faster
1741    * on faster machines and more reliable on slower machines.
1742    */
doPingPong(MockServerListener serverListener)1743   private void doPingPong(MockServerListener serverListener) throws Exception {
1744     ManagedClientTransport client = newClientTransport(server);
1745     ManagedClientTransport.Listener listener = mock(ManagedClientTransport.Listener.class);
1746     startTransport(client, listener);
1747     ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
1748     ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
1749     clientStream.start(clientStreamListener);
1750 
1751     MockServerTransportListener serverTransportListener
1752         = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
1753     StreamCreation serverStreamCreation
1754         = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
1755     ServerStream serverStream = serverStreamCreation.stream;
1756     ServerStreamListenerBase serverStreamListener = serverStreamCreation.listener;
1757 
1758     serverStream.close(Status.OK, new Metadata());
1759     assertNotNull(clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
1760     assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
1761     assertNotNull(serverStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
1762     client.shutdown(Status.UNAVAILABLE);
1763   }
1764 
1765   /**
1766    * Only assert that the Status.Code matches, but provide the entire actual result in case the
1767    * assertion fails.
1768    */
assertCodeEquals(String message, Status expected, Status actual)1769   private static void assertCodeEquals(String message, Status expected, Status actual) {
1770     if (expected == null) {
1771       fail("expected should not be null");
1772     }
1773     if (actual == null || !expected.getCode().equals(actual.getCode())) {
1774       assertEquals(message, expected, actual);
1775     }
1776   }
1777 
assertCodeEquals(Status expected, Status actual)1778   private static void assertCodeEquals(Status expected, Status actual) {
1779     assertCodeEquals(null, expected, actual);
1780   }
1781 
assertStatusEquals(Status expected, Status actual)1782   private static void assertStatusEquals(Status expected, Status actual) {
1783     if (expected == null) {
1784       fail("expected should not be null");
1785     }
1786     if (actual == null || !expected.getCode().equals(actual.getCode())
1787         || !Objects.equal(expected.getDescription(), actual.getDescription())
1788         || !Objects.equal(expected.getCause(), actual.getCause())) {
1789       assertEquals(expected, actual);
1790     }
1791   }
1792 
waitForFuture(Future<?> future, long timeout, TimeUnit unit)1793   private static boolean waitForFuture(Future<?> future, long timeout, TimeUnit unit)
1794       throws InterruptedException {
1795     try {
1796       future.get(timeout, unit);
1797     } catch (ExecutionException ex) {
1798       throw new AssertionError(ex);
1799     } catch (TimeoutException ex) {
1800       return false;
1801     }
1802     return true;
1803   }
1804 
runIfNotNull(Runnable runnable)1805   private static void runIfNotNull(Runnable runnable) {
1806     if (runnable != null) {
1807       runnable.run();
1808     }
1809   }
1810 
startTransport( ManagedClientTransport clientTransport, ManagedClientTransport.Listener listener)1811   private static void startTransport(
1812       ManagedClientTransport clientTransport,
1813       ManagedClientTransport.Listener listener) {
1814     runIfNotNull(clientTransport.start(listener));
1815     verify(listener, timeout(100)).transportReady();
1816   }
1817 
1818   private static class MockServerListener implements ServerListener {
1819     public final BlockingQueue<MockServerTransportListener> listeners
1820         = new LinkedBlockingQueue<MockServerTransportListener>();
1821     private final SettableFuture<?> shutdown = SettableFuture.create();
1822 
1823     @Override
transportCreated(ServerTransport transport)1824     public ServerTransportListener transportCreated(ServerTransport transport) {
1825       MockServerTransportListener listener = new MockServerTransportListener(transport);
1826       listeners.add(listener);
1827       return listener;
1828     }
1829 
1830     @Override
serverShutdown()1831     public void serverShutdown() {
1832       assertTrue(shutdown.set(null));
1833     }
1834 
waitForShutdown(long timeout, TimeUnit unit)1835     public boolean waitForShutdown(long timeout, TimeUnit unit) throws InterruptedException {
1836       return waitForFuture(shutdown, timeout, unit);
1837     }
1838 
takeListenerOrFail(long timeout, TimeUnit unit)1839     public MockServerTransportListener takeListenerOrFail(long timeout, TimeUnit unit)
1840         throws InterruptedException {
1841       MockServerTransportListener listener = listeners.poll(timeout, unit);
1842       if (listener == null) {
1843         fail("Timed out waiting for server transport");
1844       }
1845       return listener;
1846     }
1847   }
1848 
1849   private static class MockServerTransportListener implements ServerTransportListener {
1850     public final ServerTransport transport;
1851     public final BlockingQueue<StreamCreation> streams = new LinkedBlockingQueue<StreamCreation>();
1852     private final SettableFuture<?> terminated = SettableFuture.create();
1853 
MockServerTransportListener(ServerTransport transport)1854     public MockServerTransportListener(ServerTransport transport) {
1855       this.transport = transport;
1856     }
1857 
1858     @Override
streamCreated(ServerStream stream, String method, Metadata headers)1859     public void streamCreated(ServerStream stream, String method, Metadata headers) {
1860       ServerStreamListenerBase listener = new ServerStreamListenerBase();
1861       streams.add(new StreamCreation(stream, method, headers, listener));
1862       stream.setListener(listener);
1863     }
1864 
1865     @Override
transportReady(Attributes attributes)1866     public Attributes transportReady(Attributes attributes) {
1867       return Attributes.newBuilder()
1868           .setAll(attributes)
1869           .set(ADDITIONAL_TRANSPORT_ATTR_KEY, "additional attribute value")
1870           .build();
1871     }
1872 
1873     @Override
transportTerminated()1874     public void transportTerminated() {
1875       assertTrue(terminated.set(null));
1876     }
1877 
waitForTermination(long timeout, TimeUnit unit)1878     public boolean waitForTermination(long timeout, TimeUnit unit) throws InterruptedException {
1879       return waitForFuture(terminated, timeout, unit);
1880     }
1881 
isTerminated()1882     public boolean isTerminated() {
1883       return terminated.isDone();
1884     }
1885 
takeStreamOrFail(long timeout, TimeUnit unit)1886     public StreamCreation takeStreamOrFail(long timeout, TimeUnit unit)
1887         throws InterruptedException {
1888       StreamCreation stream = streams.poll(timeout, unit);
1889       if (stream == null) {
1890         fail("Timed out waiting for server stream");
1891       }
1892       return stream;
1893     }
1894   }
1895 
1896   private static class ServerStreamListenerBase implements ServerStreamListener {
1897     private final BlockingQueue<InputStream> messageQueue = new LinkedBlockingQueue<InputStream>();
1898     // Would have used Void instead of Object, but null elements are not allowed
1899     private final BlockingQueue<Object> readyQueue = new LinkedBlockingQueue<Object>();
1900     private final CountDownLatch halfClosedLatch = new CountDownLatch(1);
1901     private final SettableFuture<Status> status = SettableFuture.create();
1902 
awaitOnReady(int timeout, TimeUnit unit)1903     private boolean awaitOnReady(int timeout, TimeUnit unit) throws Exception {
1904       return readyQueue.poll(timeout, unit) != null;
1905     }
1906 
awaitOnReadyAndDrain(int timeout, TimeUnit unit)1907     private boolean awaitOnReadyAndDrain(int timeout, TimeUnit unit) throws Exception {
1908       if (!awaitOnReady(timeout, unit)) {
1909         return false;
1910       }
1911       // Throw the rest away
1912       readyQueue.drainTo(Lists.newArrayList());
1913       return true;
1914     }
1915 
awaitHalfClosed(int timeout, TimeUnit unit)1916     private boolean awaitHalfClosed(int timeout, TimeUnit unit) throws Exception {
1917       return halfClosedLatch.await(timeout, unit);
1918     }
1919 
1920     @Override
messagesAvailable(MessageProducer producer)1921     public void messagesAvailable(MessageProducer producer) {
1922       if (status.isDone()) {
1923         fail("messagesAvailable invoked after closed");
1924       }
1925       InputStream message;
1926       while ((message = producer.next()) != null) {
1927         messageQueue.add(message);
1928       }
1929     }
1930 
1931     @Override
onReady()1932     public void onReady() {
1933       if (status.isDone()) {
1934         fail("onReady invoked after closed");
1935       }
1936       readyQueue.add(new Object());
1937     }
1938 
1939     @Override
halfClosed()1940     public void halfClosed() {
1941       if (status.isDone()) {
1942         fail("halfClosed invoked after closed");
1943       }
1944       halfClosedLatch.countDown();
1945     }
1946 
1947     @Override
closed(Status status)1948     public void closed(Status status) {
1949       if (this.status.isDone()) {
1950         fail("closed invoked more than once");
1951       }
1952       this.status.set(status);
1953     }
1954   }
1955 
1956   private static class ClientStreamListenerBase implements ClientStreamListener {
1957     private final BlockingQueue<InputStream> messageQueue = new LinkedBlockingQueue<InputStream>();
1958     // Would have used Void instead of Object, but null elements are not allowed
1959     private final BlockingQueue<Object> readyQueue = new LinkedBlockingQueue<Object>();
1960     private final SettableFuture<Metadata> headers = SettableFuture.create();
1961     private final SettableFuture<Metadata> trailers = SettableFuture.create();
1962     private final SettableFuture<Status> status = SettableFuture.create();
1963 
awaitOnReady(int timeout, TimeUnit unit)1964     private boolean awaitOnReady(int timeout, TimeUnit unit) throws Exception {
1965       return readyQueue.poll(timeout, unit) != null;
1966     }
1967 
awaitOnReadyAndDrain(int timeout, TimeUnit unit)1968     private boolean awaitOnReadyAndDrain(int timeout, TimeUnit unit) throws Exception {
1969       if (!awaitOnReady(timeout, unit)) {
1970         return false;
1971       }
1972       // Throw the rest away
1973       readyQueue.drainTo(Lists.newArrayList());
1974       return true;
1975     }
1976 
1977     @Override
messagesAvailable(MessageProducer producer)1978     public void messagesAvailable(MessageProducer producer) {
1979       if (status.isDone()) {
1980         fail("messagesAvailable invoked after closed");
1981       }
1982       InputStream message;
1983       while ((message = producer.next()) != null) {
1984         messageQueue.add(message);
1985       }
1986     }
1987 
1988     @Override
onReady()1989     public void onReady() {
1990       if (status.isDone()) {
1991         fail("onReady invoked after closed");
1992       }
1993       readyQueue.add(new Object());
1994     }
1995 
1996     @Override
headersRead(Metadata headers)1997     public void headersRead(Metadata headers) {
1998       if (status.isDone()) {
1999         fail("headersRead invoked after closed");
2000       }
2001       this.headers.set(headers);
2002     }
2003 
2004     @Override
closed(Status status, Metadata trailers)2005     public void closed(Status status, Metadata trailers) {
2006       closed(status, RpcProgress.PROCESSED, trailers);
2007     }
2008 
2009     @Override
closed(Status status, RpcProgress rpcProgress, Metadata trailers)2010     public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) {
2011       if (this.status.isDone()) {
2012         fail("headersRead invoked after closed");
2013       }
2014       this.status.set(status);
2015       this.trailers.set(trailers);
2016     }
2017   }
2018 
2019   private static class StreamCreation {
2020     public final ServerStream stream;
2021     public final String method;
2022     public final Metadata headers;
2023     public final ServerStreamListenerBase listener;
2024 
StreamCreation( ServerStream stream, String method, Metadata headers, ServerStreamListenerBase listener)2025     public StreamCreation(
2026         ServerStream stream, String method, Metadata headers, ServerStreamListenerBase listener) {
2027       this.stream = stream;
2028       this.method = method;
2029       this.headers = headers;
2030       this.listener = listener;
2031     }
2032   }
2033 
2034   private static class StringMarshaller implements MethodDescriptor.Marshaller<String> {
2035     public static final StringMarshaller INSTANCE = new StringMarshaller();
2036 
2037     @Override
stream(String value)2038     public InputStream stream(String value) {
2039       return new ByteArrayInputStream(value.getBytes(UTF_8));
2040     }
2041 
2042     @Override
parse(InputStream stream)2043     public String parse(InputStream stream) {
2044       try {
2045         return new String(IoUtils.toByteArray(stream), UTF_8);
2046       } catch (IOException ex) {
2047         throw new RuntimeException(ex);
2048       }
2049     }
2050   }
2051 
2052   private static class StringBinaryMarshaller implements Metadata.BinaryMarshaller<String> {
2053     public static final StringBinaryMarshaller INSTANCE = new StringBinaryMarshaller();
2054 
2055     @Override
toBytes(String value)2056     public byte[] toBytes(String value) {
2057       return value.getBytes(UTF_8);
2058     }
2059 
2060     @Override
parseBytes(byte[] serialized)2061     public String parseBytes(byte[] serialized) {
2062       return new String(serialized, UTF_8);
2063     }
2064   }
2065 
getTransportStats(InternalInstrumented<SocketStats> socket)2066   private static TransportStats getTransportStats(InternalInstrumented<SocketStats> socket)
2067       throws ExecutionException, InterruptedException {
2068     return socket.getStats().get().data;
2069   }
2070 }
2071