1 /* 2 * Copyright 2016 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.internal.testing; 18 19 import static com.google.common.base.Charsets.UTF_8; 20 import static com.google.common.truth.Truth.assertThat; 21 import static org.junit.Assert.assertEquals; 22 import static org.junit.Assert.assertFalse; 23 import static org.junit.Assert.assertNotEquals; 24 import static org.junit.Assert.assertNotNull; 25 import static org.junit.Assert.assertNull; 26 import static org.junit.Assert.assertSame; 27 import static org.junit.Assert.assertTrue; 28 import static org.junit.Assert.fail; 29 import static org.junit.Assume.assumeTrue; 30 import static org.mockito.Matchers.any; 31 import static org.mockito.Matchers.anyBoolean; 32 import static org.mockito.Matchers.anyString; 33 import static org.mockito.Matchers.eq; 34 import static org.mockito.Matchers.same; 35 import static org.mockito.Mockito.inOrder; 36 import static org.mockito.Mockito.mock; 37 import static org.mockito.Mockito.never; 38 import static org.mockito.Mockito.timeout; 39 import static org.mockito.Mockito.verify; 40 import static org.mockito.Mockito.when; 41 42 import com.google.common.base.Objects; 43 import com.google.common.collect.Lists; 44 import com.google.common.util.concurrent.MoreExecutors; 45 import com.google.common.util.concurrent.SettableFuture; 46 import io.grpc.Attributes; 47 import io.grpc.CallOptions; 48 import io.grpc.ClientStreamTracer; 49 import io.grpc.Grpc; 50 import io.grpc.InternalChannelz.SocketStats; 51 import io.grpc.InternalChannelz.TransportStats; 52 import io.grpc.InternalInstrumented; 53 import io.grpc.Metadata; 54 import io.grpc.MethodDescriptor; 55 import io.grpc.ServerStreamTracer; 56 import io.grpc.Status; 57 import io.grpc.internal.ClientStream; 58 import io.grpc.internal.ClientStreamListener; 59 import io.grpc.internal.ClientTransport; 60 import io.grpc.internal.ConnectionClientTransport; 61 import io.grpc.internal.GrpcAttributes; 62 import io.grpc.internal.InternalServer; 63 import io.grpc.internal.IoUtils; 64 import io.grpc.internal.ManagedClientTransport; 65 import io.grpc.internal.ServerListener; 66 import io.grpc.internal.ServerStream; 67 import io.grpc.internal.ServerStreamListener; 68 import io.grpc.internal.ServerTransport; 69 import io.grpc.internal.ServerTransportListener; 70 import io.grpc.internal.TimeProvider; 71 import io.grpc.internal.TransportTracer; 72 import java.io.ByteArrayInputStream; 73 import java.io.IOException; 74 import java.io.InputStream; 75 import java.net.SocketAddress; 76 import java.util.Arrays; 77 import java.util.List; 78 import java.util.concurrent.BlockingQueue; 79 import java.util.concurrent.CountDownLatch; 80 import java.util.concurrent.ExecutionException; 81 import java.util.concurrent.Future; 82 import java.util.concurrent.LinkedBlockingQueue; 83 import java.util.concurrent.TimeUnit; 84 import java.util.concurrent.TimeoutException; 85 import org.junit.After; 86 import org.junit.Before; 87 import org.junit.Rule; 88 import org.junit.Test; 89 import org.junit.rules.ExpectedException; 90 import org.junit.runner.RunWith; 91 import org.junit.runners.JUnit4; 92 import org.mockito.ArgumentCaptor; 93 import org.mockito.InOrder; 94 import org.mockito.Matchers; 95 96 /** Standard unit tests for {@link ClientTransport}s and {@link ServerTransport}s. */ 97 @RunWith(JUnit4.class) 98 public abstract class AbstractTransportTest { 99 private static final int TIMEOUT_MS = 1000; 100 101 private static final Attributes.Key<String> ADDITIONAL_TRANSPORT_ATTR_KEY = 102 Attributes.Key.create("additional-attr"); 103 104 protected final TransportTracer.Factory fakeClockTransportTracer = new TransportTracer.Factory( 105 new TimeProvider() { 106 @Override 107 public long currentTimeNanos() { 108 return fakeCurrentTimeNanos(); 109 } 110 }); 111 112 /** 113 * Returns a new server that when started will be able to be connected to from the client. Each 114 * returned instance should be new and yet be accessible by new client transports. 115 */ newServer( List<ServerStreamTracer.Factory> streamTracerFactories)116 protected abstract InternalServer newServer( 117 List<ServerStreamTracer.Factory> streamTracerFactories); 118 119 /** 120 * Builds a new server that is listening on the same location as the given server instance does. 121 */ newServer( InternalServer server, List<ServerStreamTracer.Factory> streamTracerFactories)122 protected abstract InternalServer newServer( 123 InternalServer server, List<ServerStreamTracer.Factory> streamTracerFactories); 124 125 /** 126 * Returns a new transport that when started will be able to connect to {@code server}. 127 */ newClientTransport(InternalServer server)128 protected abstract ManagedClientTransport newClientTransport(InternalServer server); 129 130 /** 131 * Returns the authority string used by a client to connect to {@code server}. 132 */ testAuthority(InternalServer server)133 protected abstract String testAuthority(InternalServer server); 134 135 /** 136 * Returns true (which is default) if the transport reports message sizes to StreamTracers. 137 */ sizesReported()138 protected boolean sizesReported() { 139 return true; 140 } 141 142 /** 143 * When non-null, will be shut down during tearDown(). However, it _must_ have been started with 144 * {@code serverListener}, otherwise tearDown() can't wait for shutdown which can put following 145 * tests in an indeterminate state. 146 */ 147 private InternalServer server; 148 private ServerTransport serverTransport; 149 private ManagedClientTransport client; 150 private MethodDescriptor<String, String> methodDescriptor = 151 MethodDescriptor.<String, String>newBuilder() 152 .setType(MethodDescriptor.MethodType.UNKNOWN) 153 .setFullMethodName("service/method") 154 .setRequestMarshaller(StringMarshaller.INSTANCE) 155 .setResponseMarshaller(StringMarshaller.INSTANCE) 156 .build(); 157 private CallOptions callOptions; 158 159 private Metadata.Key<String> asciiKey = Metadata.Key.of( 160 "ascii-key", Metadata.ASCII_STRING_MARSHALLER); 161 private Metadata.Key<String> binaryKey = Metadata.Key.of( 162 "key-bin", StringBinaryMarshaller.INSTANCE); 163 164 private ManagedClientTransport.Listener mockClientTransportListener 165 = mock(ManagedClientTransport.Listener.class); 166 private MockServerListener serverListener = new MockServerListener(); 167 private ArgumentCaptor<Throwable> throwableCaptor = ArgumentCaptor.forClass(Throwable.class); 168 private final ClientStreamTracer.Factory clientStreamTracerFactory = 169 mock(ClientStreamTracer.Factory.class); 170 171 private final TestClientStreamTracer clientStreamTracer1 = new TestClientStreamTracer(); 172 private final TestClientStreamTracer clientStreamTracer2 = new TestClientStreamTracer(); 173 private final ServerStreamTracer.Factory serverStreamTracerFactory = 174 mock(ServerStreamTracer.Factory.class); 175 private final TestServerStreamTracer serverStreamTracer1 = new TestServerStreamTracer(); 176 private final TestServerStreamTracer serverStreamTracer2 = new TestServerStreamTracer(); 177 178 @Rule 179 public ExpectedException thrown = ExpectedException.none(); 180 181 @Before setUp()182 public void setUp() { 183 server = newServer(Arrays.asList(serverStreamTracerFactory)); 184 when(clientStreamTracerFactory 185 .newClientStreamTracer(any(CallOptions.class), any(Metadata.class))) 186 .thenReturn(clientStreamTracer1) 187 .thenReturn(clientStreamTracer2); 188 when(serverStreamTracerFactory.newServerStreamTracer(anyString(), any(Metadata.class))) 189 .thenReturn(serverStreamTracer1) 190 .thenReturn(serverStreamTracer2); 191 callOptions = CallOptions.DEFAULT.withStreamTracerFactory(clientStreamTracerFactory); 192 } 193 194 @After tearDown()195 public void tearDown() throws InterruptedException { 196 if (client != null) { 197 client.shutdownNow(Status.UNKNOWN.withDescription("teardown")); 198 } 199 if (serverTransport != null) { 200 serverTransport.shutdownNow(Status.UNKNOWN.withDescription("teardown")); 201 } 202 if (server != null) { 203 server.shutdown(); 204 assertTrue(serverListener.waitForShutdown(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 205 } 206 } 207 208 /** 209 * Moves the clock forward, for tests that require moving the clock forward. It is the transport 210 * subclass's responsibility to implement this method. 211 */ advanceClock(long offset, TimeUnit unit)212 protected void advanceClock(long offset, TimeUnit unit) { 213 throw new UnsupportedOperationException(); 214 } 215 216 /** 217 * Returns the current time, for tests that rely on the clock. 218 */ fakeCurrentTimeNanos()219 protected long fakeCurrentTimeNanos() { 220 throw new UnsupportedOperationException(); 221 } 222 223 // TODO(ejona): 224 // multiple streams on same transport 225 // multiple client transports to same server 226 // halfClose to trigger flush (client and server) 227 // flow control pushes back (client and server) 228 // flow control provides precisely number of messages requested (client and server) 229 // onReady called when buffer drained (on server and client) 230 // test no start reentrancy (esp. during failure) (transport and call) 231 // multiple requests/responses (verifying contents received) 232 // server transport shutdown triggers client shutdown (via GOAWAY) 233 // queued message InputStreams are closed on stream cancel 234 // (and maybe exceptions handled) 235 236 /** 237 * Test for issue https://github.com/grpc/grpc-java/issues/1682 238 */ 239 @Test frameAfterRstStreamShouldNotBreakClientChannel()240 public void frameAfterRstStreamShouldNotBreakClientChannel() throws Exception { 241 server.start(serverListener); 242 client = newClientTransport(server); 243 startTransport(client, mockClientTransportListener); 244 MockServerTransportListener serverTransportListener 245 = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); 246 serverTransport = serverTransportListener.transport; 247 248 // Try to create a sequence of frames so that the client receives a HEADERS or DATA frame 249 // after having sent a RST_STREAM to the server. Previously, this would have broken the 250 // Netty channel. 251 252 ClientStream stream = client.newStream(methodDescriptor, new Metadata(), callOptions); 253 ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); 254 stream.start(clientStreamListener); 255 StreamCreation serverStreamCreation 256 = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); 257 stream.flush(); 258 stream.writeMessage(methodDescriptor.streamRequest("foo")); 259 stream.flush(); 260 stream.cancel(Status.CANCELLED); 261 stream.flush(); 262 serverStreamCreation.stream.writeHeaders(new Metadata()); 263 serverStreamCreation.stream.flush(); 264 serverStreamCreation.stream.writeMessage(methodDescriptor.streamResponse("bar")); 265 serverStreamCreation.stream.flush(); 266 267 assertEquals( 268 Status.CANCELLED, clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 269 assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 270 271 ClientStreamListener mockClientStreamListener2 = mock(ClientStreamListener.class); 272 273 // Test that the channel is still usable i.e. we can receive headers from the server on a 274 // new stream. 275 stream = client.newStream(methodDescriptor, new Metadata(), callOptions); 276 stream.start(mockClientStreamListener2); 277 serverStreamCreation 278 = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); 279 serverStreamCreation.stream.writeHeaders(new Metadata()); 280 serverStreamCreation.stream.flush(); 281 282 verify(mockClientStreamListener2, timeout(250)).headersRead(any(Metadata.class)); 283 } 284 285 @Test serverNotListening()286 public void serverNotListening() throws Exception { 287 // Start server to just acquire a port. 288 server.start(serverListener); 289 client = newClientTransport(server); 290 server.shutdown(); 291 assertTrue(serverListener.waitForShutdown(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 292 server = null; 293 294 InOrder inOrder = inOrder(mockClientTransportListener); 295 runIfNotNull(client.start(mockClientTransportListener)); 296 verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportTerminated(); 297 ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class); 298 inOrder.verify(mockClientTransportListener).transportShutdown(statusCaptor.capture()); 299 assertCodeEquals(Status.UNAVAILABLE, statusCaptor.getValue()); 300 inOrder.verify(mockClientTransportListener).transportTerminated(); 301 verify(mockClientTransportListener, never()).transportReady(); 302 verify(mockClientTransportListener, never()).transportInUse(anyBoolean()); 303 } 304 305 @Test clientStartStop()306 public void clientStartStop() throws Exception { 307 server.start(serverListener); 308 client = newClientTransport(server); 309 InOrder inOrder = inOrder(mockClientTransportListener); 310 startTransport(client, mockClientTransportListener); 311 Status shutdownReason = Status.UNAVAILABLE.withDescription("shutdown called"); 312 client.shutdown(shutdownReason); 313 verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportTerminated(); 314 inOrder.verify(mockClientTransportListener).transportShutdown(same(shutdownReason)); 315 inOrder.verify(mockClientTransportListener).transportTerminated(); 316 verify(mockClientTransportListener, never()).transportInUse(anyBoolean()); 317 } 318 319 @Test clientStartAndStopOnceConnected()320 public void clientStartAndStopOnceConnected() throws Exception { 321 server.start(serverListener); 322 client = newClientTransport(server); 323 InOrder inOrder = inOrder(mockClientTransportListener); 324 startTransport(client, mockClientTransportListener); 325 verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportReady(); 326 MockServerTransportListener serverTransportListener 327 = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); 328 client.shutdown(Status.UNAVAILABLE); 329 verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportTerminated(); 330 inOrder.verify(mockClientTransportListener).transportShutdown(any(Status.class)); 331 inOrder.verify(mockClientTransportListener).transportTerminated(); 332 assertTrue(serverTransportListener.waitForTermination(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 333 server.shutdown(); 334 assertTrue(serverListener.waitForShutdown(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 335 server = null; 336 verify(mockClientTransportListener, never()).transportInUse(anyBoolean()); 337 } 338 339 @Test checkClientAttributes()340 public void checkClientAttributes() throws Exception { 341 server.start(serverListener); 342 client = newClientTransport(server); 343 assumeTrue(client instanceof ConnectionClientTransport); 344 ConnectionClientTransport connectionClient = (ConnectionClientTransport) client; 345 startTransport(connectionClient, mockClientTransportListener); 346 verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportReady(); 347 348 assertNotNull("security level should be set in client attributes", 349 connectionClient.getAttributes().get(GrpcAttributes.ATTR_SECURITY_LEVEL)); 350 } 351 352 @Test serverAlreadyListening()353 public void serverAlreadyListening() throws Exception { 354 client = null; 355 server.start(serverListener); 356 InternalServer server2 = newServer(server, Arrays.asList(serverStreamTracerFactory)); 357 thrown.expect(IOException.class); 358 server2.start(new MockServerListener()); 359 } 360 361 @Test openStreamPreventsTermination()362 public void openStreamPreventsTermination() throws Exception { 363 server.start(serverListener); 364 client = newClientTransport(server); 365 startTransport(client, mockClientTransportListener); 366 MockServerTransportListener serverTransportListener 367 = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); 368 serverTransport = serverTransportListener.transport; 369 370 ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions); 371 ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); 372 clientStream.start(clientStreamListener); 373 verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportInUse(true); 374 StreamCreation serverStreamCreation 375 = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); 376 ServerStream serverStream = serverStreamCreation.stream; 377 ServerStreamListenerBase serverStreamListener = serverStreamCreation.listener; 378 379 client.shutdown(Status.UNAVAILABLE); 380 client = null; 381 server.shutdown(); 382 serverTransport.shutdown(); 383 serverTransport = null; 384 385 verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportShutdown(any(Status.class)); 386 assertTrue(serverListener.waitForShutdown(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 387 388 // A new server should be able to start listening, since the current server has given up 389 // resources. There may be cases this is impossible in the future, but for now it is a useful 390 // property. 391 serverListener = new MockServerListener(); 392 server = newServer(server, Arrays.asList(serverStreamTracerFactory)); 393 server.start(serverListener); 394 395 // Try to "flush" out any listener notifications on client and server. This also ensures that 396 // the stream still functions. 397 serverStream.writeHeaders(new Metadata()); 398 clientStream.halfClose(); 399 assertNotNull(clientStreamListener.headers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 400 assertTrue(serverStreamListener.awaitHalfClosed(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 401 402 verify(mockClientTransportListener, never()).transportTerminated(); 403 verify(mockClientTransportListener, never()).transportInUse(false); 404 assertFalse(serverTransportListener.isTerminated()); 405 406 clientStream.cancel(Status.CANCELLED); 407 408 verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportTerminated(); 409 verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportInUse(false); 410 assertTrue(serverTransportListener.waitForTermination(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 411 } 412 413 @Test shutdownNowKillsClientStream()414 public void shutdownNowKillsClientStream() throws Exception { 415 server.start(serverListener); 416 client = newClientTransport(server); 417 startTransport(client, mockClientTransportListener); 418 MockServerTransportListener serverTransportListener 419 = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); 420 serverTransport = serverTransportListener.transport; 421 422 ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions); 423 ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); 424 clientStream.start(clientStreamListener); 425 verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportInUse(true); 426 StreamCreation serverStreamCreation 427 = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); 428 ServerStreamListenerBase serverStreamListener = serverStreamCreation.listener; 429 430 Status status = Status.UNKNOWN.withDescription("test shutdownNow"); 431 client.shutdownNow(status); 432 client = null; 433 434 verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportShutdown(any(Status.class)); 435 verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportTerminated(); 436 verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportInUse(false); 437 assertTrue(serverTransportListener.waitForTermination(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 438 assertTrue(serverTransportListener.isTerminated()); 439 440 assertEquals(status, clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 441 assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 442 Status serverStatus = serverStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS); 443 assertFalse(serverStatus.isOk()); 444 assertTrue(clientStreamTracer1.await(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 445 assertStatusEquals(status, clientStreamTracer1.getStatus()); 446 assertTrue(serverStreamTracer1.await(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 447 assertStatusEquals(serverStatus, serverStreamTracer1.getStatus()); 448 } 449 450 @Test shutdownNowKillsServerStream()451 public void shutdownNowKillsServerStream() throws Exception { 452 server.start(serverListener); 453 client = newClientTransport(server); 454 startTransport(client, mockClientTransportListener); 455 MockServerTransportListener serverTransportListener 456 = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); 457 serverTransport = serverTransportListener.transport; 458 459 ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions); 460 ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); 461 clientStream.start(clientStreamListener); 462 verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportInUse(true); 463 StreamCreation serverStreamCreation 464 = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); 465 ServerStreamListenerBase serverStreamListener = serverStreamCreation.listener; 466 467 Status shutdownStatus = Status.UNKNOWN.withDescription("test shutdownNow"); 468 serverTransport.shutdownNow(shutdownStatus); 469 serverTransport = null; 470 471 verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportShutdown(any(Status.class)); 472 verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportTerminated(); 473 verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportInUse(false); 474 assertTrue(serverTransportListener.waitForTermination(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 475 assertTrue(serverTransportListener.isTerminated()); 476 477 Status clientStreamStatus = clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS); 478 assertFalse(clientStreamStatus.isOk()); 479 assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 480 assertTrue(clientStreamTracer1.await(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 481 assertStatusEquals(clientStreamStatus, clientStreamTracer1.getStatus()); 482 assertTrue(serverStreamTracer1.await(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 483 assertStatusEquals(shutdownStatus, serverStreamTracer1.getStatus()); 484 485 // Generally will be same status provided to shutdownNow, but InProcessTransport can't 486 // differentiate between client and server shutdownNow. The status is not really used on 487 // server-side, so we don't care much. 488 assertNotNull(serverStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 489 } 490 491 @Test ping()492 public void ping() throws Exception { 493 server.start(serverListener); 494 client = newClientTransport(server); 495 startTransport(client, mockClientTransportListener); 496 ClientTransport.PingCallback mockPingCallback = mock(ClientTransport.PingCallback.class); 497 try { 498 client.ping(mockPingCallback, MoreExecutors.directExecutor()); 499 } catch (UnsupportedOperationException ex) { 500 // Transport doesn't support ping, so this neither passes nor fails. 501 assumeTrue(false); 502 } 503 verify(mockPingCallback, timeout(TIMEOUT_MS)).onSuccess(Matchers.anyLong()); 504 verify(mockClientTransportListener, never()).transportInUse(anyBoolean()); 505 } 506 507 @Test ping_duringShutdown()508 public void ping_duringShutdown() throws Exception { 509 server.start(serverListener); 510 client = newClientTransport(server); 511 startTransport(client, mockClientTransportListener); 512 // Stream prevents termination 513 ClientStream stream = client.newStream(methodDescriptor, new Metadata(), callOptions); 514 ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); 515 stream.start(clientStreamListener); 516 client.shutdown(Status.UNAVAILABLE); 517 verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportShutdown(any(Status.class)); 518 ClientTransport.PingCallback mockPingCallback = mock(ClientTransport.PingCallback.class); 519 try { 520 client.ping(mockPingCallback, MoreExecutors.directExecutor()); 521 } catch (UnsupportedOperationException ex) { 522 // Transport doesn't support ping, so this neither passes nor fails. 523 assumeTrue(false); 524 } 525 verify(mockPingCallback, timeout(TIMEOUT_MS)).onSuccess(Matchers.anyLong()); 526 stream.cancel(Status.CANCELLED); 527 } 528 529 @Test ping_afterTermination()530 public void ping_afterTermination() throws Exception { 531 server.start(serverListener); 532 client = newClientTransport(server); 533 startTransport(client, mockClientTransportListener); 534 verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportReady(); 535 Status shutdownReason = Status.UNAVAILABLE.withDescription("shutdown called"); 536 client.shutdown(shutdownReason); 537 verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportTerminated(); 538 ClientTransport.PingCallback mockPingCallback = mock(ClientTransport.PingCallback.class); 539 try { 540 client.ping(mockPingCallback, MoreExecutors.directExecutor()); 541 } catch (UnsupportedOperationException ex) { 542 // Transport doesn't support ping, so this neither passes nor fails. 543 assumeTrue(false); 544 } 545 verify(mockPingCallback, timeout(TIMEOUT_MS)).onFailure(throwableCaptor.capture()); 546 Status status = Status.fromThrowable(throwableCaptor.getValue()); 547 assertSame(shutdownReason, status); 548 } 549 550 @Test newStream_duringShutdown()551 public void newStream_duringShutdown() throws Exception { 552 InOrder inOrder = inOrder(clientStreamTracerFactory); 553 server.start(serverListener); 554 client = newClientTransport(server); 555 startTransport(client, mockClientTransportListener); 556 // Stream prevents termination 557 ClientStream stream = client.newStream(methodDescriptor, new Metadata(), callOptions); 558 inOrder.verify(clientStreamTracerFactory).newClientStreamTracer( 559 any(CallOptions.class), any(Metadata.class)); 560 ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); 561 stream.start(clientStreamListener); 562 client.shutdown(Status.UNAVAILABLE); 563 verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportShutdown(any(Status.class)); 564 565 ClientStream stream2 = client.newStream(methodDescriptor, new Metadata(), callOptions); 566 inOrder.verify(clientStreamTracerFactory).newClientStreamTracer( 567 any(CallOptions.class), any(Metadata.class)); 568 ClientStreamListenerBase clientStreamListener2 = new ClientStreamListenerBase(); 569 stream2.start(clientStreamListener2); 570 Status clientStreamStatus2 = 571 clientStreamListener2.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS); 572 assertNotNull(clientStreamListener2.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 573 assertCodeEquals(Status.UNAVAILABLE, clientStreamStatus2); 574 assertSame(clientStreamStatus2, clientStreamTracer2.getStatus()); 575 576 // Make sure earlier stream works. 577 MockServerTransportListener serverTransportListener 578 = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); 579 serverTransport = serverTransportListener.transport; 580 // TODO(zdapeng): Increased timeout to 20 seconds to see if flakiness of #2328 persists. Take 581 // further action after sufficient observation. 582 StreamCreation serverStreamCreation 583 = serverTransportListener.takeStreamOrFail(20 * TIMEOUT_MS, TimeUnit.MILLISECONDS); 584 serverStreamCreation.stream.close(Status.OK, new Metadata()); 585 assertCodeEquals(Status.OK, clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 586 assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 587 } 588 589 @Test newStream_afterTermination()590 public void newStream_afterTermination() throws Exception { 591 // We expect the same general behavior as duringShutdown, but for some transports (e.g., Netty) 592 // dealing with afterTermination is harder than duringShutdown. 593 server.start(serverListener); 594 client = newClientTransport(server); 595 startTransport(client, mockClientTransportListener); 596 verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportReady(); 597 Status shutdownReason = Status.UNAVAILABLE.withDescription("shutdown called"); 598 client.shutdown(shutdownReason); 599 verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportTerminated(); 600 Thread.sleep(100); 601 ClientStream stream = client.newStream(methodDescriptor, new Metadata(), callOptions); 602 ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); 603 stream.start(clientStreamListener); 604 assertEquals( 605 shutdownReason, clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 606 assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 607 verify(mockClientTransportListener, never()).transportInUse(anyBoolean()); 608 verify(clientStreamTracerFactory).newClientStreamTracer( 609 any(CallOptions.class), any(Metadata.class)); 610 assertSame(shutdownReason, clientStreamTracer1.getStatus()); 611 // Assert no interactions 612 assertNull(serverStreamTracer1.getServerCallInfo()); 613 } 614 615 @Test transportInUse_normalClose()616 public void transportInUse_normalClose() throws Exception { 617 server.start(serverListener); 618 client = newClientTransport(server); 619 startTransport(client, mockClientTransportListener); 620 ClientStream stream1 = client.newStream(methodDescriptor, new Metadata(), callOptions); 621 ClientStreamListenerBase clientStreamListener1 = new ClientStreamListenerBase(); 622 stream1.start(clientStreamListener1); 623 verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportInUse(true); 624 MockServerTransportListener serverTransportListener 625 = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); 626 StreamCreation serverStreamCreation1 627 = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); 628 ClientStream stream2 = client.newStream(methodDescriptor, new Metadata(), callOptions); 629 ClientStreamListenerBase clientStreamListener2 = new ClientStreamListenerBase(); 630 stream2.start(clientStreamListener2); 631 StreamCreation serverStreamCreation2 632 = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); 633 634 stream1.halfClose(); 635 serverStreamCreation1.stream.close(Status.OK, new Metadata()); 636 stream2.halfClose(); 637 verify(mockClientTransportListener, never()).transportInUse(false); 638 serverStreamCreation2.stream.close(Status.OK, new Metadata()); 639 verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportInUse(false); 640 // Verify that the callback has been called only once for true and false respectively 641 verify(mockClientTransportListener).transportInUse(true); 642 verify(mockClientTransportListener).transportInUse(false); 643 } 644 645 @Test transportInUse_clientCancel()646 public void transportInUse_clientCancel() throws Exception { 647 server.start(serverListener); 648 client = newClientTransport(server); 649 startTransport(client, mockClientTransportListener); 650 ClientStream stream1 = client.newStream(methodDescriptor, new Metadata(), callOptions); 651 ClientStreamListenerBase clientStreamListener1 = new ClientStreamListenerBase(); 652 stream1.start(clientStreamListener1); 653 verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportInUse(true); 654 ClientStream stream2 = client.newStream(methodDescriptor, new Metadata(), callOptions); 655 ClientStreamListenerBase clientStreamListener2 = new ClientStreamListenerBase(); 656 stream2.start(clientStreamListener2); 657 658 stream1.cancel(Status.CANCELLED); 659 verify(mockClientTransportListener, never()).transportInUse(false); 660 stream2.cancel(Status.CANCELLED); 661 verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportInUse(false); 662 // Verify that the callback has been called only once for true and false respectively 663 verify(mockClientTransportListener).transportInUse(true); 664 verify(mockClientTransportListener).transportInUse(false); 665 } 666 667 @Test 668 @SuppressWarnings("deprecation") basicStream()669 public void basicStream() throws Exception { 670 InOrder clientInOrder = inOrder(clientStreamTracerFactory); 671 InOrder serverInOrder = inOrder(serverStreamTracerFactory); 672 server.start(serverListener); 673 client = newClientTransport(server); 674 startTransport(client, mockClientTransportListener); 675 MockServerTransportListener serverTransportListener 676 = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); 677 serverTransport = serverTransportListener.transport; 678 679 Metadata clientHeaders = new Metadata(); 680 clientHeaders.put(asciiKey, "client"); 681 clientHeaders.put(asciiKey, "dupvalue"); 682 clientHeaders.put(asciiKey, "dupvalue"); 683 clientHeaders.put(binaryKey, "äbinaryclient"); 684 Metadata clientHeadersCopy = new Metadata(); 685 686 clientHeadersCopy.merge(clientHeaders); 687 ClientStream clientStream = client.newStream(methodDescriptor, clientHeaders, callOptions); 688 clientInOrder.verify(clientStreamTracerFactory).newClientStreamTracer( 689 same(callOptions), same(clientHeaders)); 690 691 ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); 692 clientStream.start(clientStreamListener); 693 StreamCreation serverStreamCreation 694 = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); 695 assertTrue(clientStreamTracer1.awaitOutboundHeaders(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 696 assertEquals(methodDescriptor.getFullMethodName(), serverStreamCreation.method); 697 assertEquals(Lists.newArrayList(clientHeadersCopy.getAll(asciiKey)), 698 Lists.newArrayList(serverStreamCreation.headers.getAll(asciiKey))); 699 assertEquals(Lists.newArrayList(clientHeadersCopy.getAll(binaryKey)), 700 Lists.newArrayList(serverStreamCreation.headers.getAll(binaryKey))); 701 ServerStream serverStream = serverStreamCreation.stream; 702 ServerStreamListenerBase serverStreamListener = serverStreamCreation.listener; 703 704 serverInOrder.verify(serverStreamTracerFactory).newServerStreamTracer( 705 eq(methodDescriptor.getFullMethodName()), any(Metadata.class)); 706 707 assertEquals("additional attribute value", 708 serverStream.getAttributes().get(ADDITIONAL_TRANSPORT_ATTR_KEY)); 709 assertNotNull(serverStream.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR)); 710 assertNotNull(serverStream.getAttributes().get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR)); 711 712 serverStream.request(1); 713 assertTrue(clientStreamListener.awaitOnReadyAndDrain(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 714 assertTrue(clientStream.isReady()); 715 clientStream.writeMessage(methodDescriptor.streamRequest("Hello!")); 716 assertThat(clientStreamTracer1.nextOutboundEvent()).isEqualTo("outboundMessage(0)"); 717 718 clientStream.flush(); 719 InputStream message = serverStreamListener.messageQueue.poll(TIMEOUT_MS, TimeUnit.MILLISECONDS); 720 assertEquals("Hello!", methodDescriptor.parseRequest(message)); 721 message.close(); 722 assertThat(clientStreamTracer1.nextOutboundEvent()) 723 .matches("outboundMessageSent\\(0, -?[0-9]+, -?[0-9]+\\)"); 724 if (sizesReported()) { 725 assertThat(clientStreamTracer1.getOutboundWireSize()).isGreaterThan(0L); 726 assertThat(clientStreamTracer1.getOutboundUncompressedSize()).isGreaterThan(0L); 727 } else { 728 assertThat(clientStreamTracer1.getOutboundWireSize()).isEqualTo(0L); 729 assertThat(clientStreamTracer1.getOutboundUncompressedSize()).isEqualTo(0L); 730 } 731 assertThat(serverStreamTracer1.nextInboundEvent()).isEqualTo("inboundMessage(0)"); 732 assertNull("no additional message expected", serverStreamListener.messageQueue.poll()); 733 734 clientStream.halfClose(); 735 assertTrue(serverStreamListener.awaitHalfClosed(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 736 737 if (sizesReported()) { 738 assertThat(serverStreamTracer1.getInboundWireSize()).isGreaterThan(0L); 739 assertThat(serverStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L); 740 } else { 741 assertThat(serverStreamTracer1.getInboundWireSize()).isEqualTo(0L); 742 assertThat(serverStreamTracer1.getInboundUncompressedSize()).isEqualTo(0L); 743 } 744 assertThat(serverStreamTracer1.nextInboundEvent()) 745 .matches("inboundMessageRead\\(0, -?[0-9]+, -?[0-9]+\\)"); 746 747 Metadata serverHeaders = new Metadata(); 748 serverHeaders.put(asciiKey, "server"); 749 serverHeaders.put(asciiKey, "dupvalue"); 750 serverHeaders.put(asciiKey, "dupvalue"); 751 serverHeaders.put(binaryKey, "äbinaryserver"); 752 Metadata serverHeadersCopy = new Metadata(); 753 serverHeadersCopy.merge(serverHeaders); 754 serverStream.writeHeaders(serverHeaders); 755 Metadata headers = clientStreamListener.headers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS); 756 assertNotNull(headers); 757 assertEquals( 758 Lists.newArrayList(serverHeadersCopy.getAll(asciiKey)), 759 Lists.newArrayList(headers.getAll(asciiKey))); 760 assertEquals( 761 Lists.newArrayList(serverHeadersCopy.getAll(binaryKey)), 762 Lists.newArrayList(headers.getAll(binaryKey))); 763 764 clientStream.request(1); 765 assertTrue(serverStreamListener.awaitOnReadyAndDrain(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 766 assertTrue(serverStream.isReady()); 767 serverStream.writeMessage(methodDescriptor.streamResponse("Hi. Who are you?")); 768 assertThat(serverStreamTracer1.nextOutboundEvent()).isEqualTo("outboundMessage(0)"); 769 770 serverStream.flush(); 771 message = clientStreamListener.messageQueue.poll(TIMEOUT_MS, TimeUnit.MILLISECONDS); 772 assertNotNull("message expected", message); 773 assertThat(serverStreamTracer1.nextOutboundEvent()) 774 .matches("outboundMessageSent\\(0, -?[0-9]+, -?[0-9]+\\)"); 775 if (sizesReported()) { 776 assertThat(serverStreamTracer1.getOutboundWireSize()).isGreaterThan(0L); 777 assertThat(serverStreamTracer1.getOutboundUncompressedSize()).isGreaterThan(0L); 778 } else { 779 assertThat(serverStreamTracer1.getOutboundWireSize()).isEqualTo(0L); 780 assertThat(serverStreamTracer1.getOutboundUncompressedSize()).isEqualTo(0L); 781 } 782 assertTrue(clientStreamTracer1.getInboundHeaders()); 783 assertThat(clientStreamTracer1.nextInboundEvent()).isEqualTo("inboundMessage(0)"); 784 assertEquals("Hi. Who are you?", methodDescriptor.parseResponse(message)); 785 assertThat(clientStreamTracer1.nextInboundEvent()) 786 .matches("inboundMessageRead\\(0, -?[0-9]+, -?[0-9]+\\)"); 787 if (sizesReported()) { 788 assertThat(clientStreamTracer1.getInboundWireSize()).isGreaterThan(0L); 789 assertThat(clientStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L); 790 } else { 791 assertThat(clientStreamTracer1.getInboundWireSize()).isEqualTo(0L); 792 assertThat(clientStreamTracer1.getInboundUncompressedSize()).isEqualTo(0L); 793 } 794 795 message.close(); 796 assertNull("no additional message expected", clientStreamListener.messageQueue.poll()); 797 798 Status status = Status.OK.withDescription("That was normal"); 799 Metadata trailers = new Metadata(); 800 trailers.put(asciiKey, "trailers"); 801 trailers.put(asciiKey, "dupvalue"); 802 trailers.put(asciiKey, "dupvalue"); 803 trailers.put(binaryKey, "äbinarytrailers"); 804 serverStream.close(status, trailers); 805 assertNull(serverStreamTracer1.nextInboundEvent()); 806 assertNull(serverStreamTracer1.nextOutboundEvent()); 807 assertCodeEquals(Status.OK, serverStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 808 assertSame(status, serverStreamTracer1.getStatus()); 809 Status clientStreamStatus = clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS); 810 Metadata clientStreamTrailers = 811 clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS); 812 assertSame(clientStreamStatus, clientStreamTracer1.getStatus()); 813 assertNull(clientStreamTracer1.nextInboundEvent()); 814 assertNull(clientStreamTracer1.nextOutboundEvent()); 815 assertEquals(status.getCode(), clientStreamStatus.getCode()); 816 assertEquals(status.getDescription(), clientStreamStatus.getDescription()); 817 assertEquals( 818 Lists.newArrayList(trailers.getAll(asciiKey)), 819 Lists.newArrayList(clientStreamTrailers.getAll(asciiKey))); 820 assertEquals( 821 Lists.newArrayList(trailers.getAll(binaryKey)), 822 Lists.newArrayList(clientStreamTrailers.getAll(binaryKey))); 823 } 824 825 @Test 826 @SuppressWarnings("deprecation") authorityPropagation()827 public void authorityPropagation() throws Exception { 828 server.start(serverListener); 829 client = newClientTransport(server); 830 startTransport(client, mockClientTransportListener); 831 MockServerTransportListener serverTransportListener 832 = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); 833 834 Metadata clientHeaders = new Metadata(); 835 ClientStream clientStream = client.newStream(methodDescriptor, clientHeaders, callOptions); 836 ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); 837 clientStream.start(clientStreamListener); 838 StreamCreation serverStreamCreation 839 = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); 840 ServerStream serverStream = serverStreamCreation.stream; 841 842 assertEquals(testAuthority(server), serverStream.getAuthority()); 843 } 844 845 @Test zeroMessageStream()846 public void zeroMessageStream() throws Exception { 847 server.start(serverListener); 848 client = newClientTransport(server); 849 startTransport(client, mockClientTransportListener); 850 MockServerTransportListener serverTransportListener 851 = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); 852 serverTransport = serverTransportListener.transport; 853 854 ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions); 855 ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); 856 clientStream.start(clientStreamListener); 857 StreamCreation serverStreamCreation 858 = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); 859 ServerStream serverStream = serverStreamCreation.stream; 860 ServerStreamListenerBase serverStreamListener = serverStreamCreation.listener; 861 862 clientStream.halfClose(); 863 assertTrue(serverStreamListener.awaitHalfClosed(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 864 865 serverStream.writeHeaders(new Metadata()); 866 assertNotNull(clientStreamListener.headers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 867 868 Status status = Status.OK.withDescription("Nice talking to you"); 869 serverStream.close(status, new Metadata()); 870 assertCodeEquals(Status.OK, serverStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 871 Status clientStreamStatus = clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS); 872 assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 873 assertEquals(status.getCode(), clientStreamStatus.getCode()); 874 assertEquals(status.getDescription(), clientStreamStatus.getDescription()); 875 assertTrue(clientStreamTracer1.getOutboundHeaders()); 876 assertTrue(clientStreamTracer1.getInboundHeaders()); 877 assertSame(clientStreamStatus, clientStreamTracer1.getStatus()); 878 assertSame(status, serverStreamTracer1.getStatus()); 879 } 880 881 @Test earlyServerClose_withServerHeaders()882 public void earlyServerClose_withServerHeaders() throws Exception { 883 server.start(serverListener); 884 client = newClientTransport(server); 885 startTransport(client, mockClientTransportListener); 886 MockServerTransportListener serverTransportListener 887 = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); 888 serverTransport = serverTransportListener.transport; 889 890 ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions); 891 ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); 892 clientStream.start(clientStreamListener); 893 StreamCreation serverStreamCreation 894 = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); 895 ServerStream serverStream = serverStreamCreation.stream; 896 ServerStreamListenerBase serverStreamListener = serverStreamCreation.listener; 897 898 serverStream.writeHeaders(new Metadata()); 899 assertNotNull(clientStreamListener.headers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 900 901 Status strippedStatus = Status.OK.withDescription("Hello. Goodbye."); 902 Status status = strippedStatus.withCause(new Exception()); 903 serverStream.close(status, new Metadata()); 904 assertCodeEquals(Status.OK, serverStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 905 Status clientStreamStatus = clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS); 906 assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 907 assertEquals(status.getCode(), clientStreamStatus.getCode()); 908 assertEquals("Hello. Goodbye.", clientStreamStatus.getDescription()); 909 assertNull(clientStreamStatus.getCause()); 910 assertTrue(clientStreamTracer1.getOutboundHeaders()); 911 assertTrue(clientStreamTracer1.getInboundHeaders()); 912 assertSame(clientStreamStatus, clientStreamTracer1.getStatus()); 913 assertSame(status, serverStreamTracer1.getStatus()); 914 } 915 916 @Test earlyServerClose_noServerHeaders()917 public void earlyServerClose_noServerHeaders() throws Exception { 918 server.start(serverListener); 919 client = newClientTransport(server); 920 startTransport(client, mockClientTransportListener); 921 MockServerTransportListener serverTransportListener 922 = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); 923 serverTransport = serverTransportListener.transport; 924 925 ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions); 926 ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); 927 clientStream.start(clientStreamListener); 928 StreamCreation serverStreamCreation 929 = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); 930 ServerStream serverStream = serverStreamCreation.stream; 931 ServerStreamListenerBase serverStreamListener = serverStreamCreation.listener; 932 933 Status strippedStatus = Status.OK.withDescription("Hellogoodbye"); 934 Status status = strippedStatus.withCause(new Exception()); 935 Metadata trailers = new Metadata(); 936 trailers.put(asciiKey, "trailers"); 937 trailers.put(asciiKey, "dupvalue"); 938 trailers.put(asciiKey, "dupvalue"); 939 trailers.put(binaryKey, "äbinarytrailers"); 940 serverStream.close(status, trailers); 941 assertCodeEquals(Status.OK, serverStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 942 Status clientStreamStatus = clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS); 943 Metadata clientStreamTrailers = 944 clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS); 945 assertEquals(status.getCode(), clientStreamStatus.getCode()); 946 assertEquals("Hellogoodbye", clientStreamStatus.getDescription()); 947 // Cause should not be transmitted to the client. 948 assertNull(clientStreamStatus.getCause()); 949 assertEquals( 950 Lists.newArrayList(trailers.getAll(asciiKey)), 951 Lists.newArrayList(clientStreamTrailers.getAll(asciiKey))); 952 assertEquals( 953 Lists.newArrayList(trailers.getAll(binaryKey)), 954 Lists.newArrayList(clientStreamTrailers.getAll(binaryKey))); 955 assertTrue(clientStreamTracer1.getOutboundHeaders()); 956 assertSame(clientStreamStatus, clientStreamTracer1.getStatus()); 957 assertSame(status, serverStreamTracer1.getStatus()); 958 } 959 960 @Test earlyServerClose_serverFailure()961 public void earlyServerClose_serverFailure() throws Exception { 962 server.start(serverListener); 963 client = newClientTransport(server); 964 startTransport(client, mockClientTransportListener); 965 MockServerTransportListener serverTransportListener 966 = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); 967 serverTransport = serverTransportListener.transport; 968 969 ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions); 970 ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); 971 clientStream.start(clientStreamListener); 972 StreamCreation serverStreamCreation 973 = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); 974 ServerStream serverStream = serverStreamCreation.stream; 975 ServerStreamListenerBase serverStreamListener = serverStreamCreation.listener; 976 977 Status strippedStatus = Status.INTERNAL.withDescription("I'm not listening"); 978 Status status = strippedStatus.withCause(new Exception()); 979 serverStream.close(status, new Metadata()); 980 assertCodeEquals(Status.OK, serverStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 981 Status clientStreamStatus = clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS); 982 assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 983 assertEquals(status.getCode(), clientStreamStatus.getCode()); 984 assertEquals(status.getDescription(), clientStreamStatus.getDescription()); 985 assertNull(clientStreamStatus.getCause()); 986 assertTrue(clientStreamTracer1.getOutboundHeaders()); 987 assertSame(clientStreamStatus, clientStreamTracer1.getStatus()); 988 assertSame(status, serverStreamTracer1.getStatus()); 989 } 990 991 @Test earlyServerClose_serverFailure_withClientCancelOnListenerClosed()992 public void earlyServerClose_serverFailure_withClientCancelOnListenerClosed() throws Exception { 993 server.start(serverListener); 994 client = newClientTransport(server); 995 runIfNotNull(client.start(mockClientTransportListener)); 996 MockServerTransportListener serverTransportListener 997 = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); 998 serverTransport = serverTransportListener.transport; 999 1000 final ClientStream clientStream = 1001 client.newStream(methodDescriptor, new Metadata(), callOptions); 1002 ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase() { 1003 @Override 1004 public void closed(Status status, Metadata trailers) { 1005 super.closed(status, trailers); 1006 // This simulates the blocking calls which can trigger clientStream.cancel(). 1007 clientStream.cancel(Status.CANCELLED.withCause(status.asRuntimeException())); 1008 } 1009 1010 @Override 1011 public void closed( 1012 Status status, RpcProgress rpcProgress, Metadata trailers) { 1013 super.closed(status, rpcProgress, trailers); 1014 // This simulates the blocking calls which can trigger clientStream.cancel(). 1015 clientStream.cancel(Status.CANCELLED.withCause(status.asRuntimeException())); 1016 } 1017 }; 1018 clientStream.start(clientStreamListener); 1019 StreamCreation serverStreamCreation 1020 = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); 1021 ServerStream serverStream = serverStreamCreation.stream; 1022 ServerStreamListenerBase serverStreamListener = serverStreamCreation.listener; 1023 1024 Status strippedStatus = Status.INTERNAL.withDescription("I'm not listening"); 1025 Status status = strippedStatus.withCause(new Exception()); 1026 serverStream.close(status, new Metadata()); 1027 assertCodeEquals(Status.OK, serverStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 1028 Status clientStreamStatus = clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS); 1029 assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 1030 assertEquals(status.getCode(), clientStreamStatus.getCode()); 1031 assertEquals(status.getDescription(), clientStreamStatus.getDescription()); 1032 assertNull(clientStreamStatus.getCause()); 1033 assertTrue(clientStreamTracer1.getOutboundHeaders()); 1034 assertSame(clientStreamStatus, clientStreamTracer1.getStatus()); 1035 assertSame(status, serverStreamTracer1.getStatus()); 1036 } 1037 1038 @Test clientCancel()1039 public void clientCancel() throws Exception { 1040 server.start(serverListener); 1041 client = newClientTransport(server); 1042 startTransport(client, mockClientTransportListener); 1043 MockServerTransportListener serverTransportListener 1044 = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); 1045 serverTransport = serverTransportListener.transport; 1046 1047 ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions); 1048 ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); 1049 clientStream.start(clientStreamListener); 1050 StreamCreation serverStreamCreation 1051 = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); 1052 ServerStreamListenerBase serverStreamListener = serverStreamCreation.listener; 1053 1054 Status status = Status.CANCELLED.withDescription("Nevermind").withCause(new Exception()); 1055 clientStream.cancel(status); 1056 assertEquals(status, clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 1057 assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 1058 Status serverStatus = serverStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS); 1059 assertNotEquals(Status.Code.OK, serverStatus.getCode()); 1060 // Cause should not be transmitted between client and server 1061 assertNull(serverStatus.getCause()); 1062 1063 clientStream.cancel(status); 1064 assertTrue(clientStreamTracer1.getOutboundHeaders()); 1065 assertSame(status, clientStreamTracer1.getStatus()); 1066 assertSame(serverStatus, serverStreamTracer1.getStatus()); 1067 } 1068 1069 @Test clientCancelFromWithinMessageRead()1070 public void clientCancelFromWithinMessageRead() throws Exception { 1071 server.start(serverListener); 1072 client = newClientTransport(server); 1073 startTransport(client, mockClientTransportListener); 1074 MockServerTransportListener serverTransportListener 1075 = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); 1076 serverTransport = serverTransportListener.transport; 1077 1078 final SettableFuture<Boolean> closedCalled = SettableFuture.create(); 1079 final ClientStream clientStream = 1080 client.newStream(methodDescriptor, new Metadata(), callOptions); 1081 final Status status = Status.CANCELLED.withDescription("nevermind"); 1082 clientStream.start(new ClientStreamListener() { 1083 private boolean messageReceived = false; 1084 1085 @Override 1086 public void headersRead(Metadata headers) { 1087 } 1088 1089 @Override 1090 public void closed(Status status, Metadata trailers) { 1091 closed(status, RpcProgress.PROCESSED, trailers); 1092 } 1093 1094 @Override 1095 public void closed( 1096 Status status, RpcProgress rpcProgress, Metadata trailers) { 1097 assertEquals(Status.CANCELLED.getCode(), status.getCode()); 1098 assertEquals("nevermind", status.getDescription()); 1099 closedCalled.set(true); 1100 } 1101 1102 @Override 1103 public void messagesAvailable(MessageProducer producer) { 1104 InputStream message; 1105 while ((message = producer.next()) != null) { 1106 assertFalse("too many messages received", messageReceived); 1107 messageReceived = true; 1108 assertEquals("foo", methodDescriptor.parseResponse(message)); 1109 clientStream.cancel(status); 1110 } 1111 } 1112 1113 @Override 1114 public void onReady() { 1115 } 1116 }); 1117 clientStream.halfClose(); 1118 clientStream.request(1); 1119 1120 StreamCreation serverStreamCreation 1121 = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); 1122 assertEquals(methodDescriptor.getFullMethodName(), serverStreamCreation.method); 1123 ServerStream serverStream = serverStreamCreation.stream; 1124 ServerStreamListenerBase serverStreamListener = serverStreamCreation.listener; 1125 assertTrue(serverStreamListener.awaitOnReadyAndDrain(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 1126 1127 assertTrue(serverStream.isReady()); 1128 serverStream.writeHeaders(new Metadata()); 1129 serverStream.writeMessage(methodDescriptor.streamRequest("foo")); 1130 serverStream.flush(); 1131 1132 // Block until closedCalled was set. 1133 closedCalled.get(5, TimeUnit.SECONDS); 1134 1135 serverStream.close(Status.OK, new Metadata()); 1136 assertTrue(clientStreamTracer1.getOutboundHeaders()); 1137 assertTrue(clientStreamTracer1.getInboundHeaders()); 1138 if (sizesReported()) { 1139 assertThat(clientStreamTracer1.getInboundWireSize()).isGreaterThan(0L); 1140 assertThat(clientStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L); 1141 assertThat(serverStreamTracer1.getOutboundWireSize()).isGreaterThan(0L); 1142 assertThat(serverStreamTracer1.getOutboundUncompressedSize()).isGreaterThan(0L); 1143 } else { 1144 assertThat(clientStreamTracer1.getInboundWireSize()).isEqualTo(0L); 1145 assertThat(clientStreamTracer1.getInboundUncompressedSize()).isEqualTo(0L); 1146 assertThat(serverStreamTracer1.getOutboundWireSize()).isEqualTo(0L); 1147 assertThat(serverStreamTracer1.getOutboundUncompressedSize()).isEqualTo(0L); 1148 } 1149 assertSame(status, clientStreamTracer1.getStatus()); 1150 // There is a race between client cancelling and server closing. The final status seen by the 1151 // server is non-deterministic. 1152 assertTrue(serverStreamTracer1.await(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 1153 assertNotNull(serverStreamTracer1.getStatus()); 1154 } 1155 1156 @Test serverCancel()1157 public void serverCancel() throws Exception { 1158 server.start(serverListener); 1159 client = newClientTransport(server); 1160 startTransport(client, mockClientTransportListener); 1161 MockServerTransportListener serverTransportListener 1162 = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); 1163 serverTransport = serverTransportListener.transport; 1164 1165 ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions); 1166 ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); 1167 clientStream.start(clientStreamListener); 1168 StreamCreation serverStreamCreation 1169 = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); 1170 ServerStream serverStream = serverStreamCreation.stream; 1171 ServerStreamListenerBase serverStreamListener = serverStreamCreation.listener; 1172 1173 Status status = Status.DEADLINE_EXCEEDED.withDescription("It was bound to happen") 1174 .withCause(new Exception()); 1175 serverStream.cancel(status); 1176 assertEquals(status, serverStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 1177 Status clientStreamStatus = clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS); 1178 assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 1179 // Presently we can't sent much back to the client in this case. Verify that is the current 1180 // behavior for consistency between transports. 1181 assertCodeEquals(Status.CANCELLED, clientStreamStatus); 1182 // Cause should not be transmitted between server and client 1183 assertNull(clientStreamStatus.getCause()); 1184 1185 verify(clientStreamTracerFactory).newClientStreamTracer( 1186 any(CallOptions.class), any(Metadata.class)); 1187 assertTrue(clientStreamTracer1.getOutboundHeaders()); 1188 assertSame(clientStreamStatus, clientStreamTracer1.getStatus()); 1189 verify(serverStreamTracerFactory).newServerStreamTracer(anyString(), any(Metadata.class)); 1190 assertSame(status, serverStreamTracer1.getStatus()); 1191 1192 // Second cancellation shouldn't trigger additional callbacks 1193 serverStream.cancel(status); 1194 doPingPong(serverListener); 1195 } 1196 1197 @Test flowControlPushBack()1198 public void flowControlPushBack() throws Exception { 1199 // This test tries to create more streams than the number of distinctive stream tracers that the 1200 // mock factory will return. This causes the last stream tracer to be returned for more than 1201 // one streams, resulting in duplicate callbacks. Since we don't care the stream tracers in 1202 // this test, we just disable the check. 1203 clientStreamTracer2.setFailDuplicateCallbacks(false); 1204 serverStreamTracer2.setFailDuplicateCallbacks(false); 1205 1206 server.start(serverListener); 1207 client = newClientTransport(server); 1208 startTransport(client, mockClientTransportListener); 1209 MockServerTransportListener serverTransportListener = 1210 serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); 1211 serverTransport = serverTransportListener.transport; 1212 1213 ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions); 1214 ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); 1215 clientStream.start(clientStreamListener); 1216 StreamCreation serverStreamCreation = 1217 serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); 1218 assertEquals(methodDescriptor.getFullMethodName(), serverStreamCreation.method); 1219 ServerStream serverStream = serverStreamCreation.stream; 1220 ServerStreamListenerBase serverStreamListener = serverStreamCreation.listener; 1221 1222 serverStream.writeHeaders(new Metadata()); 1223 1224 String largeMessage; 1225 { 1226 int size = 1 * 1024; 1227 StringBuilder sb = new StringBuilder(size); 1228 for (int i = 0; i < size; i++) { 1229 sb.append('a'); 1230 } 1231 largeMessage = sb.toString(); 1232 } 1233 1234 serverStream.request(1); 1235 assertTrue(clientStreamListener.awaitOnReadyAndDrain(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 1236 assertTrue(clientStream.isReady()); 1237 final int maxToSend = 10 * 1024; 1238 int clientSent; 1239 // Verify that flow control will push back on client. 1240 for (clientSent = 0; clientStream.isReady(); clientSent++) { 1241 if (clientSent > maxToSend) { 1242 // It seems like flow control isn't working. _Surely_ flow control would have pushed-back 1243 // already. If this is normal, please configure the transport to buffer less. 1244 fail("Too many messages sent before isReady() returned false"); 1245 } 1246 clientStream.writeMessage(methodDescriptor.streamRequest(largeMessage)); 1247 clientStream.flush(); 1248 } 1249 assertTrue(clientSent > 0); 1250 // Make sure there are at least a few messages buffered. 1251 for (; clientSent < 5; clientSent++) { 1252 clientStream.writeMessage(methodDescriptor.streamResponse(largeMessage)); 1253 clientStream.flush(); 1254 } 1255 doPingPong(serverListener); 1256 1257 int serverReceived = verifyMessageCountAndClose(serverStreamListener.messageQueue, 1); 1258 1259 clientStream.request(1); 1260 assertTrue(serverStreamListener.awaitOnReadyAndDrain(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 1261 assertTrue(serverStream.isReady()); 1262 int serverSent; 1263 // Verify that flow control will push back on server. 1264 for (serverSent = 0; serverStream.isReady(); serverSent++) { 1265 if (serverSent > maxToSend) { 1266 // It seems like flow control isn't working. _Surely_ flow control would have pushed-back 1267 // already. If this is normal, please configure the transport to buffer less. 1268 fail("Too many messages sent before isReady() returned false"); 1269 } 1270 serverStream.writeMessage(methodDescriptor.streamResponse(largeMessage)); 1271 serverStream.flush(); 1272 } 1273 assertTrue(serverSent > 0); 1274 // Make sure there are at least a few messages buffered. 1275 for (; serverSent < 5; serverSent++) { 1276 serverStream.writeMessage(methodDescriptor.streamResponse(largeMessage)); 1277 serverStream.flush(); 1278 } 1279 doPingPong(serverListener); 1280 1281 int clientReceived = verifyMessageCountAndClose(clientStreamListener.messageQueue, 1); 1282 1283 serverStream.request(3); 1284 clientStream.request(3); 1285 doPingPong(serverListener); 1286 clientReceived += verifyMessageCountAndClose(clientStreamListener.messageQueue, 3); 1287 serverReceived += verifyMessageCountAndClose(serverStreamListener.messageQueue, 3); 1288 1289 // Request the rest 1290 serverStream.request(clientSent); 1291 clientStream.request(serverSent); 1292 clientReceived += 1293 verifyMessageCountAndClose(clientStreamListener.messageQueue, serverSent - clientReceived); 1294 serverReceived += 1295 verifyMessageCountAndClose(serverStreamListener.messageQueue, clientSent - serverReceived); 1296 1297 assertTrue(clientStreamListener.awaitOnReadyAndDrain(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 1298 assertTrue(clientStream.isReady()); 1299 assertTrue(serverStreamListener.awaitOnReadyAndDrain(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 1300 assertTrue(serverStream.isReady()); 1301 1302 // Request four more 1303 for (int i = 0; i < 5; i++) { 1304 clientStream.writeMessage(methodDescriptor.streamRequest(largeMessage)); 1305 clientStream.flush(); 1306 serverStream.writeMessage(methodDescriptor.streamResponse(largeMessage)); 1307 serverStream.flush(); 1308 } 1309 doPingPong(serverListener); 1310 clientReceived += verifyMessageCountAndClose(clientStreamListener.messageQueue, 4); 1311 serverReceived += verifyMessageCountAndClose(serverStreamListener.messageQueue, 4); 1312 1313 // Drain exactly how many messages are left 1314 serverStream.request(1); 1315 clientStream.request(1); 1316 clientReceived += verifyMessageCountAndClose(clientStreamListener.messageQueue, 1); 1317 serverReceived += verifyMessageCountAndClose(serverStreamListener.messageQueue, 1); 1318 1319 // And now check that the streams can still complete gracefully 1320 clientStream.writeMessage(methodDescriptor.streamRequest(largeMessage)); 1321 clientStream.flush(); 1322 clientStream.halfClose(); 1323 doPingPong(serverListener); 1324 assertFalse(serverStreamListener.awaitHalfClosed(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 1325 1326 serverStream.request(1); 1327 serverReceived += verifyMessageCountAndClose(serverStreamListener.messageQueue, 1); 1328 assertEquals(clientSent + 6, serverReceived); 1329 assertTrue(serverStreamListener.awaitHalfClosed(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 1330 1331 serverStream.writeMessage(methodDescriptor.streamResponse(largeMessage)); 1332 serverStream.flush(); 1333 Status status = Status.OK.withDescription("... quite a lengthy discussion"); 1334 serverStream.close(status, new Metadata()); 1335 doPingPong(serverListener); 1336 try { 1337 clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS); 1338 fail("Expected TimeoutException"); 1339 } catch (TimeoutException expectedException) { 1340 } 1341 1342 clientStream.request(1); 1343 clientReceived += verifyMessageCountAndClose(clientStreamListener.messageQueue, 1); 1344 assertEquals(serverSent + 6, clientReceived); 1345 assertCodeEquals(Status.OK, serverStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 1346 Status clientStreamStatus = clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS); 1347 assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 1348 assertEquals(status.getCode(), clientStreamStatus.getCode()); 1349 assertEquals(status.getDescription(), clientStreamStatus.getDescription()); 1350 } 1351 verifyMessageCountAndClose(BlockingQueue<InputStream> messageQueue, int count)1352 private int verifyMessageCountAndClose(BlockingQueue<InputStream> messageQueue, int count) 1353 throws Exception { 1354 InputStream message; 1355 for (int i = 0; i < count; i++) { 1356 message = messageQueue.poll(TIMEOUT_MS, TimeUnit.MILLISECONDS); 1357 assertNotNull(message); 1358 message.close(); 1359 } 1360 assertNull("no additional message expected", messageQueue.poll()); 1361 return count; 1362 } 1363 1364 @Test interactionsAfterServerStreamCloseAreNoops()1365 public void interactionsAfterServerStreamCloseAreNoops() throws Exception { 1366 server.start(serverListener); 1367 client = newClientTransport(server); 1368 startTransport(client, mockClientTransportListener); 1369 MockServerTransportListener serverTransportListener 1370 = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); 1371 serverTransport = serverTransportListener.transport; 1372 1373 // boilerplate 1374 ClientStream clientStream = 1375 client.newStream(methodDescriptor, new Metadata(), callOptions); 1376 ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); 1377 clientStream.start(clientStreamListener); 1378 StreamCreation server 1379 = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); 1380 1381 // setup 1382 clientStream.request(1); 1383 server.stream.close(Status.INTERNAL, new Metadata()); 1384 assertNotNull(clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 1385 assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 1386 1387 // Ensure that for a closed ServerStream, interactions are noops 1388 server.stream.writeHeaders(new Metadata()); 1389 server.stream.writeMessage(methodDescriptor.streamResponse("response")); 1390 server.stream.close(Status.INTERNAL, new Metadata()); 1391 1392 // Make sure new streams still work properly 1393 doPingPong(serverListener); 1394 } 1395 1396 @Test interactionsAfterClientStreamCancelAreNoops()1397 public void interactionsAfterClientStreamCancelAreNoops() throws Exception { 1398 server.start(serverListener); 1399 client = newClientTransport(server); 1400 startTransport(client, mockClientTransportListener); 1401 MockServerTransportListener serverTransportListener 1402 = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); 1403 serverTransport = serverTransportListener.transport; 1404 1405 // boilerplate 1406 ClientStream clientStream = 1407 client.newStream(methodDescriptor, new Metadata(), callOptions); 1408 ClientStreamListener clientListener = mock(ClientStreamListener.class); 1409 clientStream.start(clientListener); 1410 StreamCreation server 1411 = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); 1412 1413 // setup 1414 server.stream.request(1); 1415 clientStream.cancel(Status.UNKNOWN); 1416 assertNotNull(server.listener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 1417 1418 // Ensure that for a cancelled ClientStream, interactions are noops 1419 clientStream.writeMessage(methodDescriptor.streamRequest("request")); 1420 clientStream.halfClose(); 1421 clientStream.cancel(Status.UNKNOWN); 1422 1423 // Make sure new streams still work properly 1424 doPingPong(serverListener); 1425 } 1426 1427 // Not all transports support the tracer yet haveTransportTracer()1428 protected boolean haveTransportTracer() { 1429 return false; 1430 } 1431 1432 @Test transportTracer_streamStarted()1433 public void transportTracer_streamStarted() throws Exception { 1434 server.start(serverListener); 1435 client = newClientTransport(server); 1436 startTransport(client, mockClientTransportListener); 1437 MockServerTransportListener serverTransportListener 1438 = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); 1439 if (!haveTransportTracer()) { 1440 return; 1441 } 1442 1443 // start first stream 1444 long serverFirstTimestampNanos; 1445 long clientFirstTimestampNanos; 1446 { 1447 TransportStats serverBefore = getTransportStats(serverTransportListener.transport); 1448 assertEquals(0, serverBefore.streamsStarted); 1449 assertEquals(0, serverBefore.lastRemoteStreamCreatedTimeNanos); 1450 TransportStats clientBefore = getTransportStats(client); 1451 assertEquals(0, clientBefore.streamsStarted); 1452 assertEquals(0, clientBefore.lastRemoteStreamCreatedTimeNanos); 1453 1454 ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions); 1455 ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); 1456 clientStream.start(clientStreamListener); 1457 StreamCreation serverStreamCreation = serverTransportListener 1458 .takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); 1459 1460 TransportStats serverAfter = getTransportStats(serverTransportListener.transport); 1461 assertEquals(1, serverAfter.streamsStarted); 1462 serverFirstTimestampNanos = serverAfter.lastRemoteStreamCreatedTimeNanos; 1463 assertEquals(fakeCurrentTimeNanos(), serverAfter.lastRemoteStreamCreatedTimeNanos); 1464 1465 TransportStats clientAfter = getTransportStats(client); 1466 assertEquals(1, clientAfter.streamsStarted); 1467 clientFirstTimestampNanos = clientAfter.lastLocalStreamCreatedTimeNanos; 1468 assertEquals(fakeCurrentTimeNanos(), clientFirstTimestampNanos); 1469 1470 ServerStream serverStream = serverStreamCreation.stream; 1471 serverStream.close(Status.OK, new Metadata()); 1472 } 1473 1474 final long elapsedMillis = 100; 1475 advanceClock(100, TimeUnit.MILLISECONDS); 1476 1477 // start second stream 1478 { 1479 TransportStats serverBefore = getTransportStats(serverTransportListener.transport); 1480 assertEquals(1, serverBefore.streamsStarted); 1481 TransportStats clientBefore = getTransportStats(client); 1482 assertEquals(1, clientBefore.streamsStarted); 1483 1484 ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions); 1485 ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); 1486 clientStream.start(clientStreamListener); 1487 StreamCreation serverStreamCreation = serverTransportListener 1488 .takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); 1489 1490 TransportStats serverAfter = getTransportStats(serverTransportListener.transport); 1491 assertEquals(2, serverAfter.streamsStarted); 1492 assertEquals( 1493 TimeUnit.MILLISECONDS.toNanos(elapsedMillis), 1494 serverAfter.lastRemoteStreamCreatedTimeNanos - serverFirstTimestampNanos); 1495 assertEquals(fakeCurrentTimeNanos(), serverAfter.lastRemoteStreamCreatedTimeNanos); 1496 1497 TransportStats clientAfter = getTransportStats(client); 1498 assertEquals(2, clientAfter.streamsStarted); 1499 assertEquals( 1500 TimeUnit.MILLISECONDS.toNanos(elapsedMillis), 1501 clientAfter.lastLocalStreamCreatedTimeNanos - clientFirstTimestampNanos); 1502 assertEquals(fakeCurrentTimeNanos(), clientAfter.lastLocalStreamCreatedTimeNanos); 1503 1504 ServerStream serverStream = serverStreamCreation.stream; 1505 serverStream.close(Status.OK, new Metadata()); 1506 } 1507 } 1508 1509 @Test transportTracer_server_streamEnded_ok()1510 public void transportTracer_server_streamEnded_ok() throws Exception { 1511 server.start(serverListener); 1512 client = newClientTransport(server); 1513 startTransport(client, mockClientTransportListener); 1514 ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions); 1515 ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); 1516 clientStream.start(clientStreamListener); 1517 MockServerTransportListener serverTransportListener 1518 = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); 1519 StreamCreation serverStreamCreation 1520 = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); 1521 ServerStream serverStream = serverStreamCreation.stream; 1522 if (!haveTransportTracer()) { 1523 return; 1524 } 1525 1526 TransportStats serverBefore = getTransportStats(serverTransportListener.transport); 1527 assertEquals(0, serverBefore.streamsSucceeded); 1528 assertEquals(0, serverBefore.streamsFailed); 1529 TransportStats clientBefore = getTransportStats(client); 1530 assertEquals(0, clientBefore.streamsSucceeded); 1531 assertEquals(0, clientBefore.streamsFailed); 1532 1533 clientStream.halfClose(); 1534 serverStream.close(Status.OK, new Metadata()); 1535 // do not validate stats until close() has been called on client 1536 assertNotNull(clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 1537 assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 1538 1539 1540 TransportStats serverAfter = getTransportStats(serverTransportListener.transport); 1541 assertEquals(1, serverAfter.streamsSucceeded); 1542 assertEquals(0, serverAfter.streamsFailed); 1543 TransportStats clientAfter = getTransportStats(client); 1544 assertEquals(1, clientAfter.streamsSucceeded); 1545 assertEquals(0, clientAfter.streamsFailed); 1546 } 1547 1548 @Test transportTracer_server_streamEnded_nonOk()1549 public void transportTracer_server_streamEnded_nonOk() throws Exception { 1550 server.start(serverListener); 1551 client = newClientTransport(server); 1552 startTransport(client, mockClientTransportListener); 1553 ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions); 1554 ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); 1555 clientStream.start(clientStreamListener); 1556 MockServerTransportListener serverTransportListener 1557 = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); 1558 StreamCreation serverStreamCreation 1559 = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); 1560 ServerStream serverStream = serverStreamCreation.stream; 1561 if (!haveTransportTracer()) { 1562 return; 1563 } 1564 1565 TransportStats serverBefore = getTransportStats(serverTransportListener.transport); 1566 assertEquals(0, serverBefore.streamsFailed); 1567 assertEquals(0, serverBefore.streamsSucceeded); 1568 TransportStats clientBefore = getTransportStats(client); 1569 assertEquals(0, clientBefore.streamsFailed); 1570 assertEquals(0, clientBefore.streamsSucceeded); 1571 1572 serverStream.close(Status.UNKNOWN, new Metadata()); 1573 // do not validate stats until close() has been called on client 1574 assertNotNull(clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 1575 assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 1576 1577 1578 TransportStats serverAfter = getTransportStats(serverTransportListener.transport); 1579 assertEquals(1, serverAfter.streamsFailed); 1580 assertEquals(0, serverAfter.streamsSucceeded); 1581 TransportStats clientAfter = getTransportStats(client); 1582 assertEquals(1, clientAfter.streamsFailed); 1583 assertEquals(0, clientAfter.streamsSucceeded); 1584 1585 client.shutdown(Status.UNAVAILABLE); 1586 } 1587 1588 @Test transportTracer_client_streamEnded_nonOk()1589 public void transportTracer_client_streamEnded_nonOk() throws Exception { 1590 server.start(serverListener); 1591 client = newClientTransport(server); 1592 startTransport(client, mockClientTransportListener); 1593 ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions); 1594 ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); 1595 clientStream.start(clientStreamListener); 1596 MockServerTransportListener serverTransportListener = 1597 serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); 1598 StreamCreation serverStreamCreation = 1599 serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); 1600 if (!haveTransportTracer()) { 1601 return; 1602 } 1603 1604 TransportStats serverBefore = getTransportStats(serverTransportListener.transport); 1605 assertEquals(0, serverBefore.streamsFailed); 1606 assertEquals(0, serverBefore.streamsSucceeded); 1607 TransportStats clientBefore = getTransportStats(client); 1608 assertEquals(0, clientBefore.streamsFailed); 1609 assertEquals(0, clientBefore.streamsSucceeded); 1610 1611 clientStream.cancel(Status.UNKNOWN); 1612 // do not validate stats until close() has been called on server 1613 assertNotNull(serverStreamCreation.listener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 1614 1615 TransportStats serverAfter = getTransportStats(serverTransportListener.transport); 1616 assertEquals(1, serverAfter.streamsFailed); 1617 assertEquals(0, serverAfter.streamsSucceeded); 1618 TransportStats clientAfter = getTransportStats(client); 1619 assertEquals(1, clientAfter.streamsFailed); 1620 assertEquals(0, clientAfter.streamsSucceeded); 1621 } 1622 1623 @Test transportTracer_server_receive_msg()1624 public void transportTracer_server_receive_msg() throws Exception { 1625 server.start(serverListener); 1626 client = newClientTransport(server); 1627 startTransport(client, mockClientTransportListener); 1628 ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions); 1629 ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); 1630 clientStream.start(clientStreamListener); 1631 MockServerTransportListener serverTransportListener 1632 = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); 1633 StreamCreation serverStreamCreation 1634 = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); 1635 ServerStream serverStream = serverStreamCreation.stream; 1636 ServerStreamListenerBase serverStreamListener = serverStreamCreation.listener; 1637 if (!haveTransportTracer()) { 1638 return; 1639 } 1640 1641 TransportStats serverBefore = getTransportStats(serverTransportListener.transport); 1642 assertEquals(0, serverBefore.messagesReceived); 1643 assertEquals(0, serverBefore.lastMessageReceivedTimeNanos); 1644 TransportStats clientBefore = getTransportStats(client); 1645 assertEquals(0, clientBefore.messagesSent); 1646 assertEquals(0, clientBefore.lastMessageSentTimeNanos); 1647 1648 serverStream.request(1); 1649 clientStream.writeMessage(methodDescriptor.streamRequest("request")); 1650 clientStream.flush(); 1651 clientStream.halfClose(); 1652 verifyMessageCountAndClose(serverStreamListener.messageQueue, 1); 1653 1654 TransportStats serverAfter = getTransportStats(serverTransportListener.transport); 1655 assertEquals(1, serverAfter.messagesReceived); 1656 assertEquals(fakeCurrentTimeNanos(), serverAfter.lastMessageReceivedTimeNanos); 1657 TransportStats clientAfter = getTransportStats(client); 1658 assertEquals(1, clientAfter.messagesSent); 1659 assertEquals(fakeCurrentTimeNanos(), clientAfter.lastMessageSentTimeNanos); 1660 1661 serverStream.close(Status.OK, new Metadata()); 1662 } 1663 1664 @Test transportTracer_server_send_msg()1665 public void transportTracer_server_send_msg() throws Exception { 1666 server.start(serverListener); 1667 client = newClientTransport(server); 1668 startTransport(client, mockClientTransportListener); 1669 ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions); 1670 ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); 1671 clientStream.start(clientStreamListener); 1672 MockServerTransportListener serverTransportListener 1673 = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); 1674 StreamCreation serverStreamCreation 1675 = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); 1676 ServerStream serverStream = serverStreamCreation.stream; 1677 if (!haveTransportTracer()) { 1678 return; 1679 } 1680 1681 TransportStats serverBefore = getTransportStats(serverTransportListener.transport); 1682 assertEquals(0, serverBefore.messagesSent); 1683 assertEquals(0, serverBefore.lastMessageSentTimeNanos); 1684 TransportStats clientBefore = getTransportStats(client); 1685 assertEquals(0, clientBefore.messagesReceived); 1686 assertEquals(0, clientBefore.lastMessageReceivedTimeNanos); 1687 1688 clientStream.request(1); 1689 serverStream.writeHeaders(new Metadata()); 1690 serverStream.writeMessage(methodDescriptor.streamResponse("response")); 1691 serverStream.flush(); 1692 verifyMessageCountAndClose(clientStreamListener.messageQueue, 1); 1693 1694 TransportStats serverAfter = getTransportStats(serverTransportListener.transport); 1695 assertEquals(1, serverAfter.messagesSent); 1696 assertEquals(fakeCurrentTimeNanos(), serverAfter.lastMessageSentTimeNanos); 1697 TransportStats clientAfter = getTransportStats(client); 1698 assertEquals(1, clientAfter.messagesReceived); 1699 assertEquals(fakeCurrentTimeNanos(), clientAfter.lastMessageReceivedTimeNanos); 1700 1701 serverStream.close(Status.OK, new Metadata()); 1702 } 1703 1704 @Test socketStats()1705 public void socketStats() throws Exception { 1706 server.start(serverListener); 1707 ManagedClientTransport client = newClientTransport(server); 1708 startTransport(client, mockClientTransportListener); 1709 ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions); 1710 ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); 1711 clientStream.start(clientStreamListener); 1712 1713 MockServerTransportListener serverTransportListener 1714 = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); 1715 StreamCreation serverStreamCreation 1716 = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); 1717 ServerStream serverStream = serverStreamCreation.stream; 1718 1719 SocketAddress serverAddress = clientStream.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR); 1720 SocketAddress clientAddress = serverStream.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR); 1721 1722 SocketStats clientSocketStats = client.getStats().get(); 1723 assertEquals(clientAddress, clientSocketStats.local); 1724 assertEquals(serverAddress, clientSocketStats.remote); 1725 // very basic sanity check that socket options are populated 1726 assertNotNull(clientSocketStats.socketOptions.lingerSeconds); 1727 assertTrue(clientSocketStats.socketOptions.others.containsKey("SO_SNDBUF")); 1728 1729 SocketStats serverSocketStats = serverTransportListener.transport.getStats().get(); 1730 assertEquals(serverAddress, serverSocketStats.local); 1731 assertEquals(clientAddress, serverSocketStats.remote); 1732 // very basic sanity check that socket options are populated 1733 assertNotNull(serverSocketStats.socketOptions.lingerSeconds); 1734 assertTrue(serverSocketStats.socketOptions.others.containsKey("SO_SNDBUF")); 1735 } 1736 1737 /** 1738 * Helper that simply does an RPC. It can be used similar to a sleep for negative testing: to give 1739 * time for actions _not_ to happen. Since it is based on doing an actual RPC with actual 1740 * callbacks, it generally provides plenty of time for Runnables to execute. But it is also faster 1741 * on faster machines and more reliable on slower machines. 1742 */ doPingPong(MockServerListener serverListener)1743 private void doPingPong(MockServerListener serverListener) throws Exception { 1744 ManagedClientTransport client = newClientTransport(server); 1745 ManagedClientTransport.Listener listener = mock(ManagedClientTransport.Listener.class); 1746 startTransport(client, listener); 1747 ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions); 1748 ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); 1749 clientStream.start(clientStreamListener); 1750 1751 MockServerTransportListener serverTransportListener 1752 = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); 1753 StreamCreation serverStreamCreation 1754 = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); 1755 ServerStream serverStream = serverStreamCreation.stream; 1756 ServerStreamListenerBase serverStreamListener = serverStreamCreation.listener; 1757 1758 serverStream.close(Status.OK, new Metadata()); 1759 assertNotNull(clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 1760 assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 1761 assertNotNull(serverStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); 1762 client.shutdown(Status.UNAVAILABLE); 1763 } 1764 1765 /** 1766 * Only assert that the Status.Code matches, but provide the entire actual result in case the 1767 * assertion fails. 1768 */ assertCodeEquals(String message, Status expected, Status actual)1769 private static void assertCodeEquals(String message, Status expected, Status actual) { 1770 if (expected == null) { 1771 fail("expected should not be null"); 1772 } 1773 if (actual == null || !expected.getCode().equals(actual.getCode())) { 1774 assertEquals(message, expected, actual); 1775 } 1776 } 1777 assertCodeEquals(Status expected, Status actual)1778 private static void assertCodeEquals(Status expected, Status actual) { 1779 assertCodeEquals(null, expected, actual); 1780 } 1781 assertStatusEquals(Status expected, Status actual)1782 private static void assertStatusEquals(Status expected, Status actual) { 1783 if (expected == null) { 1784 fail("expected should not be null"); 1785 } 1786 if (actual == null || !expected.getCode().equals(actual.getCode()) 1787 || !Objects.equal(expected.getDescription(), actual.getDescription()) 1788 || !Objects.equal(expected.getCause(), actual.getCause())) { 1789 assertEquals(expected, actual); 1790 } 1791 } 1792 waitForFuture(Future<?> future, long timeout, TimeUnit unit)1793 private static boolean waitForFuture(Future<?> future, long timeout, TimeUnit unit) 1794 throws InterruptedException { 1795 try { 1796 future.get(timeout, unit); 1797 } catch (ExecutionException ex) { 1798 throw new AssertionError(ex); 1799 } catch (TimeoutException ex) { 1800 return false; 1801 } 1802 return true; 1803 } 1804 runIfNotNull(Runnable runnable)1805 private static void runIfNotNull(Runnable runnable) { 1806 if (runnable != null) { 1807 runnable.run(); 1808 } 1809 } 1810 startTransport( ManagedClientTransport clientTransport, ManagedClientTransport.Listener listener)1811 private static void startTransport( 1812 ManagedClientTransport clientTransport, 1813 ManagedClientTransport.Listener listener) { 1814 runIfNotNull(clientTransport.start(listener)); 1815 verify(listener, timeout(100)).transportReady(); 1816 } 1817 1818 private static class MockServerListener implements ServerListener { 1819 public final BlockingQueue<MockServerTransportListener> listeners 1820 = new LinkedBlockingQueue<MockServerTransportListener>(); 1821 private final SettableFuture<?> shutdown = SettableFuture.create(); 1822 1823 @Override transportCreated(ServerTransport transport)1824 public ServerTransportListener transportCreated(ServerTransport transport) { 1825 MockServerTransportListener listener = new MockServerTransportListener(transport); 1826 listeners.add(listener); 1827 return listener; 1828 } 1829 1830 @Override serverShutdown()1831 public void serverShutdown() { 1832 assertTrue(shutdown.set(null)); 1833 } 1834 waitForShutdown(long timeout, TimeUnit unit)1835 public boolean waitForShutdown(long timeout, TimeUnit unit) throws InterruptedException { 1836 return waitForFuture(shutdown, timeout, unit); 1837 } 1838 takeListenerOrFail(long timeout, TimeUnit unit)1839 public MockServerTransportListener takeListenerOrFail(long timeout, TimeUnit unit) 1840 throws InterruptedException { 1841 MockServerTransportListener listener = listeners.poll(timeout, unit); 1842 if (listener == null) { 1843 fail("Timed out waiting for server transport"); 1844 } 1845 return listener; 1846 } 1847 } 1848 1849 private static class MockServerTransportListener implements ServerTransportListener { 1850 public final ServerTransport transport; 1851 public final BlockingQueue<StreamCreation> streams = new LinkedBlockingQueue<StreamCreation>(); 1852 private final SettableFuture<?> terminated = SettableFuture.create(); 1853 MockServerTransportListener(ServerTransport transport)1854 public MockServerTransportListener(ServerTransport transport) { 1855 this.transport = transport; 1856 } 1857 1858 @Override streamCreated(ServerStream stream, String method, Metadata headers)1859 public void streamCreated(ServerStream stream, String method, Metadata headers) { 1860 ServerStreamListenerBase listener = new ServerStreamListenerBase(); 1861 streams.add(new StreamCreation(stream, method, headers, listener)); 1862 stream.setListener(listener); 1863 } 1864 1865 @Override transportReady(Attributes attributes)1866 public Attributes transportReady(Attributes attributes) { 1867 return Attributes.newBuilder() 1868 .setAll(attributes) 1869 .set(ADDITIONAL_TRANSPORT_ATTR_KEY, "additional attribute value") 1870 .build(); 1871 } 1872 1873 @Override transportTerminated()1874 public void transportTerminated() { 1875 assertTrue(terminated.set(null)); 1876 } 1877 waitForTermination(long timeout, TimeUnit unit)1878 public boolean waitForTermination(long timeout, TimeUnit unit) throws InterruptedException { 1879 return waitForFuture(terminated, timeout, unit); 1880 } 1881 isTerminated()1882 public boolean isTerminated() { 1883 return terminated.isDone(); 1884 } 1885 takeStreamOrFail(long timeout, TimeUnit unit)1886 public StreamCreation takeStreamOrFail(long timeout, TimeUnit unit) 1887 throws InterruptedException { 1888 StreamCreation stream = streams.poll(timeout, unit); 1889 if (stream == null) { 1890 fail("Timed out waiting for server stream"); 1891 } 1892 return stream; 1893 } 1894 } 1895 1896 private static class ServerStreamListenerBase implements ServerStreamListener { 1897 private final BlockingQueue<InputStream> messageQueue = new LinkedBlockingQueue<InputStream>(); 1898 // Would have used Void instead of Object, but null elements are not allowed 1899 private final BlockingQueue<Object> readyQueue = new LinkedBlockingQueue<Object>(); 1900 private final CountDownLatch halfClosedLatch = new CountDownLatch(1); 1901 private final SettableFuture<Status> status = SettableFuture.create(); 1902 awaitOnReady(int timeout, TimeUnit unit)1903 private boolean awaitOnReady(int timeout, TimeUnit unit) throws Exception { 1904 return readyQueue.poll(timeout, unit) != null; 1905 } 1906 awaitOnReadyAndDrain(int timeout, TimeUnit unit)1907 private boolean awaitOnReadyAndDrain(int timeout, TimeUnit unit) throws Exception { 1908 if (!awaitOnReady(timeout, unit)) { 1909 return false; 1910 } 1911 // Throw the rest away 1912 readyQueue.drainTo(Lists.newArrayList()); 1913 return true; 1914 } 1915 awaitHalfClosed(int timeout, TimeUnit unit)1916 private boolean awaitHalfClosed(int timeout, TimeUnit unit) throws Exception { 1917 return halfClosedLatch.await(timeout, unit); 1918 } 1919 1920 @Override messagesAvailable(MessageProducer producer)1921 public void messagesAvailable(MessageProducer producer) { 1922 if (status.isDone()) { 1923 fail("messagesAvailable invoked after closed"); 1924 } 1925 InputStream message; 1926 while ((message = producer.next()) != null) { 1927 messageQueue.add(message); 1928 } 1929 } 1930 1931 @Override onReady()1932 public void onReady() { 1933 if (status.isDone()) { 1934 fail("onReady invoked after closed"); 1935 } 1936 readyQueue.add(new Object()); 1937 } 1938 1939 @Override halfClosed()1940 public void halfClosed() { 1941 if (status.isDone()) { 1942 fail("halfClosed invoked after closed"); 1943 } 1944 halfClosedLatch.countDown(); 1945 } 1946 1947 @Override closed(Status status)1948 public void closed(Status status) { 1949 if (this.status.isDone()) { 1950 fail("closed invoked more than once"); 1951 } 1952 this.status.set(status); 1953 } 1954 } 1955 1956 private static class ClientStreamListenerBase implements ClientStreamListener { 1957 private final BlockingQueue<InputStream> messageQueue = new LinkedBlockingQueue<InputStream>(); 1958 // Would have used Void instead of Object, but null elements are not allowed 1959 private final BlockingQueue<Object> readyQueue = new LinkedBlockingQueue<Object>(); 1960 private final SettableFuture<Metadata> headers = SettableFuture.create(); 1961 private final SettableFuture<Metadata> trailers = SettableFuture.create(); 1962 private final SettableFuture<Status> status = SettableFuture.create(); 1963 awaitOnReady(int timeout, TimeUnit unit)1964 private boolean awaitOnReady(int timeout, TimeUnit unit) throws Exception { 1965 return readyQueue.poll(timeout, unit) != null; 1966 } 1967 awaitOnReadyAndDrain(int timeout, TimeUnit unit)1968 private boolean awaitOnReadyAndDrain(int timeout, TimeUnit unit) throws Exception { 1969 if (!awaitOnReady(timeout, unit)) { 1970 return false; 1971 } 1972 // Throw the rest away 1973 readyQueue.drainTo(Lists.newArrayList()); 1974 return true; 1975 } 1976 1977 @Override messagesAvailable(MessageProducer producer)1978 public void messagesAvailable(MessageProducer producer) { 1979 if (status.isDone()) { 1980 fail("messagesAvailable invoked after closed"); 1981 } 1982 InputStream message; 1983 while ((message = producer.next()) != null) { 1984 messageQueue.add(message); 1985 } 1986 } 1987 1988 @Override onReady()1989 public void onReady() { 1990 if (status.isDone()) { 1991 fail("onReady invoked after closed"); 1992 } 1993 readyQueue.add(new Object()); 1994 } 1995 1996 @Override headersRead(Metadata headers)1997 public void headersRead(Metadata headers) { 1998 if (status.isDone()) { 1999 fail("headersRead invoked after closed"); 2000 } 2001 this.headers.set(headers); 2002 } 2003 2004 @Override closed(Status status, Metadata trailers)2005 public void closed(Status status, Metadata trailers) { 2006 closed(status, RpcProgress.PROCESSED, trailers); 2007 } 2008 2009 @Override closed(Status status, RpcProgress rpcProgress, Metadata trailers)2010 public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) { 2011 if (this.status.isDone()) { 2012 fail("headersRead invoked after closed"); 2013 } 2014 this.status.set(status); 2015 this.trailers.set(trailers); 2016 } 2017 } 2018 2019 private static class StreamCreation { 2020 public final ServerStream stream; 2021 public final String method; 2022 public final Metadata headers; 2023 public final ServerStreamListenerBase listener; 2024 StreamCreation( ServerStream stream, String method, Metadata headers, ServerStreamListenerBase listener)2025 public StreamCreation( 2026 ServerStream stream, String method, Metadata headers, ServerStreamListenerBase listener) { 2027 this.stream = stream; 2028 this.method = method; 2029 this.headers = headers; 2030 this.listener = listener; 2031 } 2032 } 2033 2034 private static class StringMarshaller implements MethodDescriptor.Marshaller<String> { 2035 public static final StringMarshaller INSTANCE = new StringMarshaller(); 2036 2037 @Override stream(String value)2038 public InputStream stream(String value) { 2039 return new ByteArrayInputStream(value.getBytes(UTF_8)); 2040 } 2041 2042 @Override parse(InputStream stream)2043 public String parse(InputStream stream) { 2044 try { 2045 return new String(IoUtils.toByteArray(stream), UTF_8); 2046 } catch (IOException ex) { 2047 throw new RuntimeException(ex); 2048 } 2049 } 2050 } 2051 2052 private static class StringBinaryMarshaller implements Metadata.BinaryMarshaller<String> { 2053 public static final StringBinaryMarshaller INSTANCE = new StringBinaryMarshaller(); 2054 2055 @Override toBytes(String value)2056 public byte[] toBytes(String value) { 2057 return value.getBytes(UTF_8); 2058 } 2059 2060 @Override parseBytes(byte[] serialized)2061 public String parseBytes(byte[] serialized) { 2062 return new String(serialized, UTF_8); 2063 } 2064 } 2065 getTransportStats(InternalInstrumented<SocketStats> socket)2066 private static TransportStats getTransportStats(InternalInstrumented<SocketStats> socket) 2067 throws ExecutionException, InterruptedException { 2068 return socket.getStats().get().data; 2069 } 2070 } 2071