1 /*
2  * Copyright 2014 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.netty;
18 
19 import static com.google.common.base.Charsets.UTF_8;
20 import static com.google.common.truth.Truth.assertThat;
21 import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE;
22 import static io.grpc.internal.GrpcUtil.DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS;
23 import static io.grpc.internal.GrpcUtil.DEFAULT_SERVER_KEEPALIVE_TIME_NANOS;
24 import static io.grpc.internal.GrpcUtil.KEEPALIVE_TIME_NANOS_DISABLED;
25 import static io.grpc.internal.GrpcUtil.USER_AGENT_KEY;
26 import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE;
27 import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_NANOS_DISABLED;
28 import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_IDLE_NANOS_DISABLED;
29 import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
30 import static org.junit.Assert.assertEquals;
31 import static org.junit.Assert.assertFalse;
32 import static org.junit.Assert.assertNotNull;
33 import static org.junit.Assert.assertNull;
34 import static org.junit.Assert.assertSame;
35 import static org.junit.Assert.assertTrue;
36 import static org.junit.Assert.fail;
37 
38 import com.google.common.io.ByteStreams;
39 import com.google.common.util.concurrent.SettableFuture;
40 import io.grpc.Attributes;
41 import io.grpc.CallOptions;
42 import io.grpc.Grpc;
43 import io.grpc.InternalChannelz;
44 import io.grpc.Metadata;
45 import io.grpc.MethodDescriptor;
46 import io.grpc.MethodDescriptor.Marshaller;
47 import io.grpc.ServerStreamTracer;
48 import io.grpc.Status;
49 import io.grpc.Status.Code;
50 import io.grpc.StatusException;
51 import io.grpc.internal.ClientStream;
52 import io.grpc.internal.ClientStreamListener;
53 import io.grpc.internal.ClientTransport;
54 import io.grpc.internal.FakeClock;
55 import io.grpc.internal.GrpcUtil;
56 import io.grpc.internal.ManagedClientTransport;
57 import io.grpc.internal.ServerListener;
58 import io.grpc.internal.ServerStream;
59 import io.grpc.internal.ServerStreamListener;
60 import io.grpc.internal.ServerTransport;
61 import io.grpc.internal.ServerTransportListener;
62 import io.grpc.internal.TransportTracer;
63 import io.grpc.internal.testing.TestUtils;
64 import io.netty.channel.ChannelConfig;
65 import io.netty.channel.ChannelOption;
66 import io.netty.channel.nio.NioEventLoopGroup;
67 import io.netty.channel.socket.SocketChannelConfig;
68 import io.netty.channel.socket.nio.NioServerSocketChannel;
69 import io.netty.channel.socket.nio.NioSocketChannel;
70 import io.netty.handler.codec.http2.StreamBufferingEncoder;
71 import io.netty.handler.ssl.ClientAuth;
72 import io.netty.handler.ssl.SslContext;
73 import io.netty.handler.ssl.SupportedCipherSuiteFilter;
74 import io.netty.util.AsciiString;
75 import java.io.ByteArrayInputStream;
76 import java.io.File;
77 import java.io.IOException;
78 import java.io.InputStream;
79 import java.net.InetSocketAddress;
80 import java.util.ArrayList;
81 import java.util.Collections;
82 import java.util.HashMap;
83 import java.util.List;
84 import java.util.Map;
85 import java.util.concurrent.ExecutionException;
86 import java.util.concurrent.TimeUnit;
87 import java.util.concurrent.TimeoutException;
88 import javax.net.ssl.SSLHandshakeException;
89 import org.junit.After;
90 import org.junit.Before;
91 import org.junit.Test;
92 import org.junit.runner.RunWith;
93 import org.junit.runners.JUnit4;
94 import org.mockito.Mock;
95 import org.mockito.MockitoAnnotations;
96 
97 /**
98  * Tests for {@link NettyClientTransport}.
99  */
100 @RunWith(JUnit4.class)
101 public class NettyClientTransportTest {
102   private static final SslContext SSL_CONTEXT = createSslContext();
103 
104   @Mock
105   private ManagedClientTransport.Listener clientTransportListener;
106 
107   private final List<NettyClientTransport> transports = new ArrayList<>();
108   private final NioEventLoopGroup group = new NioEventLoopGroup(1);
109   private final EchoServerListener serverListener = new EchoServerListener();
110   private final InternalChannelz channelz = new InternalChannelz();
111   private Runnable tooManyPingsRunnable = new Runnable() {
112     // Throwing is useless in this method, because Netty doesn't propagate the exception
113     @Override public void run() {}
114   };
115   private Attributes eagAttributes = Attributes.EMPTY;
116 
117   private ProtocolNegotiator negotiator = ProtocolNegotiators.serverTls(SSL_CONTEXT);
118 
119   private InetSocketAddress address;
120   private String authority;
121   private NettyServer server;
122 
123   @Before
setup()124   public void setup() {
125     MockitoAnnotations.initMocks(this);
126   }
127 
128   @After
teardown()129   public void teardown() throws Exception {
130     for (NettyClientTransport transport : transports) {
131       transport.shutdown(Status.UNAVAILABLE);
132     }
133 
134     if (server != null) {
135       server.shutdown();
136     }
137 
138     group.shutdownGracefully(0, 10, TimeUnit.SECONDS);
139   }
140 
141   @Test
testToString()142   public void testToString() throws Exception {
143     address = TestUtils.testServerAddress(12345);
144     authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort());
145     String s = newTransport(newNegotiator()).toString();
146     transports.clear();
147     assertTrue("Unexpected: " + s, s.contains("NettyClientTransport"));
148     assertTrue("Unexpected: " + s, s.contains(address.toString()));
149   }
150 
151   @Test
addDefaultUserAgent()152   public void addDefaultUserAgent() throws Exception {
153     startServer();
154     NettyClientTransport transport = newTransport(newNegotiator());
155     callMeMaybe(transport.start(clientTransportListener));
156 
157     // Send a single RPC and wait for the response.
158     new Rpc(transport).halfClose().waitForResponse();
159 
160     // Verify that the received headers contained the User-Agent.
161     assertEquals(1, serverListener.streamListeners.size());
162 
163     Metadata headers = serverListener.streamListeners.get(0).headers;
164     assertEquals(GrpcUtil.getGrpcUserAgent("netty", null), headers.get(USER_AGENT_KEY));
165   }
166 
167   @Test
setSoLingerChannelOption()168   public void setSoLingerChannelOption() throws IOException {
169     startServer();
170     Map<ChannelOption<?>, Object> channelOptions = new HashMap<ChannelOption<?>, Object>();
171     // set SO_LINGER option
172     int soLinger = 123;
173     channelOptions.put(ChannelOption.SO_LINGER, soLinger);
174     NettyClientTransport transport = new NettyClientTransport(
175         address, NioSocketChannel.class, channelOptions, group, newNegotiator(),
176         DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE,
177         KEEPALIVE_TIME_NANOS_DISABLED, 1L, false, authority, null /* user agent */,
178         tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY);
179     transports.add(transport);
180     callMeMaybe(transport.start(clientTransportListener));
181 
182     // verify SO_LINGER has been set
183     ChannelConfig config = transport.channel().config();
184     assertTrue(config instanceof SocketChannelConfig);
185     assertEquals(soLinger, ((SocketChannelConfig) config).getSoLinger());
186   }
187 
188   @Test
overrideDefaultUserAgent()189   public void overrideDefaultUserAgent() throws Exception {
190     startServer();
191     NettyClientTransport transport = newTransport(newNegotiator(),
192         DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, "testUserAgent", true);
193     callMeMaybe(transport.start(clientTransportListener));
194 
195     new Rpc(transport, new Metadata()).halfClose().waitForResponse();
196 
197     // Verify that the received headers contained the User-Agent.
198     assertEquals(1, serverListener.streamListeners.size());
199     Metadata receivedHeaders = serverListener.streamListeners.get(0).headers;
200     assertEquals(GrpcUtil.getGrpcUserAgent("netty", "testUserAgent"),
201         receivedHeaders.get(USER_AGENT_KEY));
202   }
203 
204   @Test
maxMessageSizeShouldBeEnforced()205   public void maxMessageSizeShouldBeEnforced() throws Throwable {
206     startServer();
207     // Allow the response payloads of up to 1 byte.
208     NettyClientTransport transport = newTransport(newNegotiator(),
209         1, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null, true);
210     callMeMaybe(transport.start(clientTransportListener));
211 
212     try {
213       // Send a single RPC and wait for the response.
214       new Rpc(transport).halfClose().waitForResponse();
215       fail("Expected the stream to fail.");
216     } catch (ExecutionException e) {
217       Status status = Status.fromThrowable(e);
218       assertEquals(Code.RESOURCE_EXHAUSTED, status.getCode());
219       assertTrue("Missing exceeds maximum from: " + status.getDescription(),
220           status.getDescription().contains("exceeds maximum"));
221     }
222   }
223 
224   /**
225    * Verifies that we can create multiple TLS client transports from the same builder.
226    */
227   @Test
creatingMultipleTlsTransportsShouldSucceed()228   public void creatingMultipleTlsTransportsShouldSucceed() throws Exception {
229     startServer();
230 
231     // Create a couple client transports.
232     ProtocolNegotiator negotiator = newNegotiator();
233     for (int index = 0; index < 2; ++index) {
234       NettyClientTransport transport = newTransport(negotiator);
235       callMeMaybe(transport.start(clientTransportListener));
236     }
237 
238     // Send a single RPC on each transport.
239     final List<Rpc> rpcs = new ArrayList<>(transports.size());
240     for (NettyClientTransport transport : transports) {
241       rpcs.add(new Rpc(transport).halfClose());
242     }
243 
244     // Wait for the RPCs to complete.
245     for (Rpc rpc : rpcs) {
246       rpc.waitForResponse();
247     }
248   }
249 
250   @Test
negotiationFailurePropagatesToStatus()251   public void negotiationFailurePropagatesToStatus() throws Exception {
252     negotiator = ProtocolNegotiators.serverPlaintext();
253     startServer();
254 
255     final NoopProtocolNegotiator negotiator = new NoopProtocolNegotiator();
256     final NettyClientTransport transport = newTransport(negotiator);
257     callMeMaybe(transport.start(clientTransportListener));
258     final Status failureStatus = Status.UNAVAILABLE.withDescription("oh noes!");
259     transport.channel().eventLoop().execute(new Runnable() {
260       @Override
261       public void run() {
262         negotiator.handler.fail(transport.channel().pipeline().context(negotiator.handler),
263             failureStatus.asRuntimeException());
264       }
265     });
266 
267     Rpc rpc = new Rpc(transport).halfClose();
268     try {
269       rpc.waitForClose();
270       fail("expected exception");
271     } catch (ExecutionException ex) {
272       assertSame(failureStatus, ((StatusException) ex.getCause()).getStatus());
273     }
274   }
275 
276   @Test
tlsNegotiationFailurePropagatesToStatus()277   public void tlsNegotiationFailurePropagatesToStatus() throws Exception {
278     File serverCert = TestUtils.loadCert("server1.pem");
279     File serverKey = TestUtils.loadCert("server1.key");
280     // Don't trust ca.pem, so that client auth fails
281     SslContext sslContext = GrpcSslContexts.forServer(serverCert, serverKey)
282         .ciphers(TestUtils.preferredTestCiphers(), SupportedCipherSuiteFilter.INSTANCE)
283         .clientAuth(ClientAuth.REQUIRE)
284         .build();
285     negotiator = ProtocolNegotiators.serverTls(sslContext);
286     startServer();
287 
288     File caCert = TestUtils.loadCert("ca.pem");
289     File clientCert = TestUtils.loadCert("client.pem");
290     File clientKey = TestUtils.loadCert("client.key");
291     SslContext clientContext = GrpcSslContexts.forClient()
292         .trustManager(caCert)
293         .ciphers(TestUtils.preferredTestCiphers(), SupportedCipherSuiteFilter.INSTANCE)
294         .keyManager(clientCert, clientKey)
295         .build();
296     ProtocolNegotiator negotiator = ProtocolNegotiators.tls(clientContext);
297     final NettyClientTransport transport = newTransport(negotiator);
298     callMeMaybe(transport.start(clientTransportListener));
299 
300     Rpc rpc = new Rpc(transport).halfClose();
301     try {
302       rpc.waitForClose();
303       fail("expected exception");
304     } catch (ExecutionException ex) {
305       StatusException sre = (StatusException) ex.getCause();
306       assertEquals(Status.Code.UNAVAILABLE, sre.getStatus().getCode());
307       assertThat(sre.getCause()).isInstanceOf(SSLHandshakeException.class);
308       assertThat(sre.getCause().getMessage()).contains("SSLV3_ALERT_HANDSHAKE_FAILURE");
309     }
310   }
311 
312   @Test
channelExceptionDuringNegotiatonPropagatesToStatus()313   public void channelExceptionDuringNegotiatonPropagatesToStatus() throws Exception {
314     negotiator = ProtocolNegotiators.serverPlaintext();
315     startServer();
316 
317     NoopProtocolNegotiator negotiator = new NoopProtocolNegotiator();
318     NettyClientTransport transport = newTransport(negotiator);
319     callMeMaybe(transport.start(clientTransportListener));
320     final Status failureStatus = Status.UNAVAILABLE.withDescription("oh noes!");
321     transport.channel().pipeline().fireExceptionCaught(failureStatus.asRuntimeException());
322 
323     Rpc rpc = new Rpc(transport).halfClose();
324     try {
325       rpc.waitForClose();
326       fail("expected exception");
327     } catch (ExecutionException ex) {
328       assertSame(failureStatus, ((StatusException) ex.getCause()).getStatus());
329     }
330   }
331 
332   @Test
handlerExceptionDuringNegotiatonPropagatesToStatus()333   public void handlerExceptionDuringNegotiatonPropagatesToStatus() throws Exception {
334     negotiator = ProtocolNegotiators.serverPlaintext();
335     startServer();
336 
337     final NoopProtocolNegotiator negotiator = new NoopProtocolNegotiator();
338     final NettyClientTransport transport = newTransport(negotiator);
339     callMeMaybe(transport.start(clientTransportListener));
340     final Status failureStatus = Status.UNAVAILABLE.withDescription("oh noes!");
341     transport.channel().eventLoop().execute(new Runnable() {
342       @Override
343       public void run() {
344         try {
345           negotiator.handler.exceptionCaught(
346               transport.channel().pipeline().context(negotiator.handler),
347               failureStatus.asRuntimeException());
348         } catch (Exception ex) {
349           throw new RuntimeException(ex);
350         }
351       }
352     });
353 
354     Rpc rpc = new Rpc(transport).halfClose();
355     try {
356       rpc.waitForClose();
357       fail("expected exception");
358     } catch (ExecutionException ex) {
359       assertSame(failureStatus, ((StatusException) ex.getCause()).getStatus());
360     }
361   }
362 
363   @Test
bufferedStreamsShouldBeClosedWhenConnectionTerminates()364   public void bufferedStreamsShouldBeClosedWhenConnectionTerminates() throws Exception {
365     // Only allow a single stream active at a time.
366     startServer(1, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE);
367 
368     NettyClientTransport transport = newTransport(newNegotiator());
369     callMeMaybe(transport.start(clientTransportListener));
370 
371     // Send a dummy RPC in order to ensure that the updated SETTINGS_MAX_CONCURRENT_STREAMS
372     // has been received by the remote endpoint.
373     new Rpc(transport).halfClose().waitForResponse();
374 
375     // Create 3 streams, but don't half-close. The transport will buffer the second and third.
376     Rpc[] rpcs = new Rpc[] { new Rpc(transport), new Rpc(transport), new Rpc(transport) };
377 
378     // Wait for the response for the stream that was actually created.
379     rpcs[0].waitForResponse();
380 
381     // Now forcibly terminate the connection from the server side.
382     serverListener.transports.get(0).channel().pipeline().firstContext().close();
383 
384     // Now wait for both listeners to be closed.
385     for (int i = 1; i < rpcs.length; i++) {
386       try {
387         rpcs[i].waitForClose();
388         fail("Expected the RPC to fail");
389       } catch (ExecutionException e) {
390         // Expected.
391         Throwable t = getRootCause(e);
392         // Make sure that the Http2ChannelClosedException got replaced with the real cause of
393         // the shutdown.
394         assertFalse(t instanceof StreamBufferingEncoder.Http2ChannelClosedException);
395       }
396     }
397   }
398 
399   public static class CantConstructChannel extends NioSocketChannel {
400     /** Constructor. It doesn't work. Feel free to try. But it doesn't work. */
CantConstructChannel()401     public CantConstructChannel() {
402       // Use an Error because we've seen cases of channels failing to construct due to classloading
403       // problems (like mixing different versions of Netty), and those involve Errors.
404       throw new CantConstructChannelError();
405     }
406   }
407 
408   private static class CantConstructChannelError extends Error {}
409 
410   @Test
failingToConstructChannelShouldFailGracefully()411   public void failingToConstructChannelShouldFailGracefully() throws Exception {
412     address = TestUtils.testServerAddress(12345);
413     authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort());
414     NettyClientTransport transport = new NettyClientTransport(
415         address, CantConstructChannel.class, new HashMap<ChannelOption<?>, Object>(), group,
416         newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE,
417         GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, KEEPALIVE_TIME_NANOS_DISABLED, 1, false, authority,
418         null, tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY);
419     transports.add(transport);
420 
421     // Should not throw
422     callMeMaybe(transport.start(clientTransportListener));
423 
424     // And RPCs and PINGs should fail cleanly, reporting the failure
425     Rpc rpc = new Rpc(transport);
426     try {
427       rpc.waitForResponse();
428       fail("Expected exception");
429     } catch (Exception ex) {
430       if (!(getRootCause(ex) instanceof CantConstructChannelError)) {
431         throw new AssertionError("Could not find expected error", ex);
432       }
433     }
434 
435     final SettableFuture<Object> pingResult = SettableFuture.create();
436     FakeClock clock = new FakeClock();
437     ClientTransport.PingCallback pingCallback = new ClientTransport.PingCallback() {
438       @Override
439       public void onSuccess(long roundTripTimeNanos) {
440         pingResult.set(roundTripTimeNanos);
441       }
442 
443       @Override
444       public void onFailure(Throwable cause) {
445         pingResult.setException(cause);
446       }
447     };
448     transport.ping(pingCallback, clock.getScheduledExecutorService());
449     assertFalse(pingResult.isDone());
450     clock.runDueTasks();
451     assertTrue(pingResult.isDone());
452     try {
453       pingResult.get();
454       fail("Expected exception");
455     } catch (Exception ex) {
456       if (!(getRootCause(ex) instanceof CantConstructChannelError)) {
457         throw new AssertionError("Could not find expected error", ex);
458       }
459     }
460   }
461 
462   @Test
maxHeaderListSizeShouldBeEnforcedOnClient()463   public void maxHeaderListSizeShouldBeEnforcedOnClient() throws Exception {
464     startServer();
465 
466     NettyClientTransport transport =
467         newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE, 1, null, true);
468     callMeMaybe(transport.start(clientTransportListener));
469 
470     try {
471       // Send a single RPC and wait for the response.
472       new Rpc(transport, new Metadata()).halfClose().waitForResponse();
473       fail("The stream should have been failed due to client received header exceeds header list"
474           + " size limit!");
475     } catch (Exception e) {
476       Throwable rootCause = getRootCause(e);
477       Status status = ((StatusException) rootCause).getStatus();
478       assertEquals(Status.Code.INTERNAL, status.getCode());
479       assertEquals("HTTP/2 error code: PROTOCOL_ERROR\nReceived Rst Stream",
480           status.getDescription());
481     }
482   }
483 
484   @Test
maxHeaderListSizeShouldBeEnforcedOnServer()485   public void maxHeaderListSizeShouldBeEnforcedOnServer() throws Exception {
486     startServer(100, 1);
487 
488     NettyClientTransport transport = newTransport(newNegotiator());
489     callMeMaybe(transport.start(clientTransportListener));
490 
491     try {
492       // Send a single RPC and wait for the response.
493       new Rpc(transport, new Metadata()).halfClose().waitForResponse();
494       fail("The stream should have been failed due to server received header exceeds header list"
495           + " size limit!");
496     } catch (Exception e) {
497       Status status = Status.fromThrowable(e);
498       assertEquals(status.toString(), Status.Code.INTERNAL, status.getCode());
499     }
500   }
501 
502   @Test
getAttributes_negotiatorHandler()503   public void getAttributes_negotiatorHandler() throws Exception {
504     address = TestUtils.testServerAddress(12345);
505     authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort());
506 
507     NettyClientTransport transport = newTransport(new NoopProtocolNegotiator());
508     callMeMaybe(transport.start(clientTransportListener));
509 
510     assertEquals(Attributes.EMPTY, transport.getAttributes());
511   }
512 
513   @Test
getEagAttributes_negotiatorHandler()514   public void getEagAttributes_negotiatorHandler() throws Exception {
515     address = TestUtils.testServerAddress(12345);
516     authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort());
517 
518     NoopProtocolNegotiator npn = new NoopProtocolNegotiator();
519     eagAttributes = Attributes.newBuilder()
520         .set(Attributes.Key.create("trash"), "value")
521         .build();
522     NettyClientTransport transport = newTransport(npn);
523     callMeMaybe(transport.start(clientTransportListener));
524 
525     // EAG Attributes are available before the negotiation is complete
526     assertSame(eagAttributes, npn.grpcHandler.getEagAttributes());
527   }
528 
529   @Test
clientStreamGetsAttributes()530   public void clientStreamGetsAttributes() throws Exception {
531     startServer();
532     NettyClientTransport transport = newTransport(newNegotiator());
533     callMeMaybe(transport.start(clientTransportListener));
534     Rpc rpc = new Rpc(transport).halfClose();
535     rpc.waitForResponse();
536 
537     assertNotNull(rpc.stream.getAttributes().get(Grpc.TRANSPORT_ATTR_SSL_SESSION));
538     assertEquals(address, rpc.stream.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR));
539   }
540 
541   @Test
keepAliveEnabled()542   public void keepAliveEnabled() throws Exception {
543     startServer();
544     NettyClientTransport transport = newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE,
545         GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null /* user agent */, true /* keep alive */);
546     callMeMaybe(transport.start(clientTransportListener));
547     Rpc rpc = new Rpc(transport).halfClose();
548     rpc.waitForResponse();
549 
550     assertNotNull(transport.keepAliveManager());
551   }
552 
553   @Test
keepAliveDisabled()554   public void keepAliveDisabled() throws Exception {
555     startServer();
556     NettyClientTransport transport = newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE,
557         GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null /* user agent */, false /* keep alive */);
558     callMeMaybe(transport.start(clientTransportListener));
559     Rpc rpc = new Rpc(transport).halfClose();
560     rpc.waitForResponse();
561 
562     assertNull(transport.keepAliveManager());
563   }
564 
getRootCause(Throwable t)565   private Throwable getRootCause(Throwable t) {
566     if (t.getCause() == null) {
567       return t;
568     }
569     return getRootCause(t.getCause());
570   }
571 
newNegotiator()572   private ProtocolNegotiator newNegotiator() throws IOException {
573     File caCert = TestUtils.loadCert("ca.pem");
574     SslContext clientContext = GrpcSslContexts.forClient().trustManager(caCert)
575         .ciphers(TestUtils.preferredTestCiphers(), SupportedCipherSuiteFilter.INSTANCE).build();
576     return ProtocolNegotiators.tls(clientContext);
577   }
578 
newTransport(ProtocolNegotiator negotiator)579   private NettyClientTransport newTransport(ProtocolNegotiator negotiator) {
580     return newTransport(negotiator, DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE,
581         null /* user agent */, true /* keep alive */);
582   }
583 
newTransport(ProtocolNegotiator negotiator, int maxMsgSize, int maxHeaderListSize, String userAgent, boolean enableKeepAlive)584   private NettyClientTransport newTransport(ProtocolNegotiator negotiator, int maxMsgSize,
585       int maxHeaderListSize, String userAgent, boolean enableKeepAlive) {
586     long keepAliveTimeNano = KEEPALIVE_TIME_NANOS_DISABLED;
587     long keepAliveTimeoutNano = TimeUnit.SECONDS.toNanos(1L);
588     if (enableKeepAlive) {
589       keepAliveTimeNano = TimeUnit.SECONDS.toNanos(10L);
590     }
591     NettyClientTransport transport = new NettyClientTransport(
592         address, NioSocketChannel.class, new HashMap<ChannelOption<?>, Object>(), group, negotiator,
593         DEFAULT_WINDOW_SIZE, maxMsgSize, maxHeaderListSize,
594         keepAliveTimeNano, keepAliveTimeoutNano,
595         false, authority, userAgent, tooManyPingsRunnable,
596         new TransportTracer(), eagAttributes);
597     transports.add(transport);
598     return transport;
599   }
600 
startServer()601   private void startServer() throws IOException {
602     startServer(100, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE);
603   }
604 
startServer(int maxStreamsPerConnection, int maxHeaderListSize)605   private void startServer(int maxStreamsPerConnection, int maxHeaderListSize) throws IOException {
606     server = new NettyServer(
607         TestUtils.testServerAddress(0),
608         NioServerSocketChannel.class,
609         new HashMap<ChannelOption<?>, Object>(),
610         group, group, negotiator,
611         Collections.<ServerStreamTracer.Factory>emptyList(),
612         TransportTracer.getDefaultFactory(),
613         maxStreamsPerConnection,
614         DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, maxHeaderListSize,
615         DEFAULT_SERVER_KEEPALIVE_TIME_NANOS, DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS,
616         MAX_CONNECTION_IDLE_NANOS_DISABLED,
617         MAX_CONNECTION_AGE_NANOS_DISABLED, MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE, true, 0,
618         channelz);
619     server.start(serverListener);
620     address = TestUtils.testServerAddress(server.getPort());
621     authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort());
622   }
623 
callMeMaybe(Runnable r)624   private void callMeMaybe(Runnable r) {
625     if (r != null) {
626       r.run();
627     }
628   }
629 
createSslContext()630   private static SslContext createSslContext() {
631     try {
632       File serverCert = TestUtils.loadCert("server1.pem");
633       File key = TestUtils.loadCert("server1.key");
634       return GrpcSslContexts.forServer(serverCert, key)
635           .ciphers(TestUtils.preferredTestCiphers(), SupportedCipherSuiteFilter.INSTANCE).build();
636     } catch (IOException ex) {
637       throw new RuntimeException(ex);
638     }
639   }
640 
641   private static class Rpc {
642     static final String MESSAGE = "hello";
643     static final MethodDescriptor<String, String> METHOD =
644         MethodDescriptor.<String, String>newBuilder()
645             .setType(MethodDescriptor.MethodType.UNARY)
646             .setFullMethodName("testService/test")
647             .setRequestMarshaller(StringMarshaller.INSTANCE)
648             .setResponseMarshaller(StringMarshaller.INSTANCE)
649             .build();
650 
651     final ClientStream stream;
652     final TestClientStreamListener listener = new TestClientStreamListener();
653 
Rpc(NettyClientTransport transport)654     Rpc(NettyClientTransport transport) {
655       this(transport, new Metadata());
656     }
657 
Rpc(NettyClientTransport transport, Metadata headers)658     Rpc(NettyClientTransport transport, Metadata headers) {
659       stream = transport.newStream(METHOD, headers, CallOptions.DEFAULT);
660       stream.start(listener);
661       stream.request(1);
662       stream.writeMessage(new ByteArrayInputStream(MESSAGE.getBytes(UTF_8)));
663       stream.flush();
664     }
665 
halfClose()666     Rpc halfClose() {
667       stream.halfClose();
668       return this;
669     }
670 
waitForResponse()671     void waitForResponse() throws InterruptedException, ExecutionException, TimeoutException {
672       listener.responseFuture.get(10, TimeUnit.SECONDS);
673     }
674 
waitForClose()675     void waitForClose() throws InterruptedException, ExecutionException, TimeoutException {
676       listener.closedFuture.get(10, TimeUnit.SECONDS);
677     }
678   }
679 
680   private static final class TestClientStreamListener implements ClientStreamListener {
681     final SettableFuture<Void> closedFuture = SettableFuture.create();
682     final SettableFuture<Void> responseFuture = SettableFuture.create();
683 
684     @Override
headersRead(Metadata headers)685     public void headersRead(Metadata headers) {
686     }
687 
688     @Override
closed(Status status, Metadata trailers)689     public void closed(Status status, Metadata trailers) {
690       closed(status, RpcProgress.PROCESSED, trailers);
691     }
692 
693     @Override
closed(Status status, RpcProgress rpcProgress, Metadata trailers)694     public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) {
695       if (status.isOk()) {
696         closedFuture.set(null);
697       } else {
698         StatusException e = status.asException();
699         closedFuture.setException(e);
700         responseFuture.setException(e);
701       }
702     }
703 
704     @Override
messagesAvailable(MessageProducer producer)705     public void messagesAvailable(MessageProducer producer) {
706       if (producer.next() != null) {
707         responseFuture.set(null);
708       }
709     }
710 
711     @Override
onReady()712     public void onReady() {
713     }
714   }
715 
716   private static final class EchoServerStreamListener implements ServerStreamListener {
717     final ServerStream stream;
718     final String method;
719     final Metadata headers;
720 
EchoServerStreamListener(ServerStream stream, String method, Metadata headers)721     EchoServerStreamListener(ServerStream stream, String method, Metadata headers) {
722       this.stream = stream;
723       this.method = method;
724       this.headers = headers;
725     }
726 
727     @Override
messagesAvailable(MessageProducer producer)728     public void messagesAvailable(MessageProducer producer) {
729       InputStream message;
730       while ((message = producer.next()) != null) {
731         // Just echo back the message.
732         stream.writeMessage(message);
733         stream.flush();
734       }
735     }
736 
737     @Override
onReady()738     public void onReady() {
739     }
740 
741     @Override
halfClosed()742     public void halfClosed() {
743       // Just close when the client closes.
744       stream.close(Status.OK, new Metadata());
745     }
746 
747     @Override
closed(Status status)748     public void closed(Status status) {
749     }
750   }
751 
752   private static final class EchoServerListener implements ServerListener {
753     final List<NettyServerTransport> transports = new ArrayList<>();
754     final List<EchoServerStreamListener> streamListeners =
755             Collections.synchronizedList(new ArrayList<EchoServerStreamListener>());
756 
757     @Override
transportCreated(final ServerTransport transport)758     public ServerTransportListener transportCreated(final ServerTransport transport) {
759       transports.add((NettyServerTransport) transport);
760       return new ServerTransportListener() {
761         @Override
762         public void streamCreated(ServerStream stream, String method, Metadata headers) {
763           EchoServerStreamListener listener = new EchoServerStreamListener(stream, method, headers);
764           stream.setListener(listener);
765           stream.writeHeaders(new Metadata());
766           stream.request(1);
767           streamListeners.add(listener);
768         }
769 
770         @Override
771         public Attributes transportReady(Attributes transportAttrs) {
772           return transportAttrs;
773         }
774 
775         @Override
776         public void transportTerminated() {}
777       };
778     }
779 
780     @Override
serverShutdown()781     public void serverShutdown() {
782     }
783   }
784 
785   private static final class StringMarshaller implements Marshaller<String> {
786     static final StringMarshaller INSTANCE = new StringMarshaller();
787 
788     @Override
789     public InputStream stream(String value) {
790       return new ByteArrayInputStream(value.getBytes(UTF_8));
791     }
792 
793     @Override
794     public String parse(InputStream stream) {
795       try {
796         return new String(ByteStreams.toByteArray(stream), UTF_8);
797       } catch (IOException ex) {
798         throw new RuntimeException(ex);
799       }
800     }
801   }
802 
803   private static class NoopHandler extends ProtocolNegotiators.AbstractBufferingHandler
804       implements ProtocolNegotiator.Handler {
805     public NoopHandler(GrpcHttp2ConnectionHandler grpcHandler) {
806       super(grpcHandler);
807     }
808 
809     @Override
810     public AsciiString scheme() {
811       return Utils.HTTP;
812     }
813   }
814 
815   private static class NoopProtocolNegotiator implements ProtocolNegotiator {
816     GrpcHttp2ConnectionHandler grpcHandler;
817     NoopHandler handler;
818 
819     @Override
820     public Handler newHandler(final GrpcHttp2ConnectionHandler grpcHandler) {
821       this.grpcHandler = grpcHandler;
822       return handler = new NoopHandler(grpcHandler);
823     }
824   }
825 }
826