1 /*
2  * Copyright 2015 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.testing.integration;
18 
19 import static com.google.common.truth.Truth.assertThat;
20 import static io.grpc.internal.GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY;
21 import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertFalse;
24 import static org.junit.Assert.assertNull;
25 import static org.junit.Assert.assertTrue;
26 
27 import com.google.protobuf.ByteString;
28 import io.grpc.CallOptions;
29 import io.grpc.Channel;
30 import io.grpc.ClientCall;
31 import io.grpc.ClientCall.Listener;
32 import io.grpc.ClientInterceptor;
33 import io.grpc.Codec;
34 import io.grpc.CompressorRegistry;
35 import io.grpc.DecompressorRegistry;
36 import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
37 import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
38 import io.grpc.ManagedChannel;
39 import io.grpc.ManagedChannelBuilder;
40 import io.grpc.Metadata;
41 import io.grpc.MethodDescriptor;
42 import io.grpc.Server;
43 import io.grpc.ServerBuilder;
44 import io.grpc.ServerCall;
45 import io.grpc.ServerCallHandler;
46 import io.grpc.ServerInterceptor;
47 import io.grpc.ServerInterceptors;
48 import io.grpc.stub.StreamObserver;
49 import io.grpc.testing.integration.Messages.Payload;
50 import io.grpc.testing.integration.Messages.SimpleRequest;
51 import io.grpc.testing.integration.Messages.SimpleResponse;
52 import io.grpc.testing.integration.TestServiceGrpc.TestServiceBlockingStub;
53 import io.grpc.testing.integration.TransportCompressionTest.Fzip;
54 import java.nio.charset.Charset;
55 import java.util.ArrayList;
56 import java.util.Collection;
57 import java.util.List;
58 import java.util.concurrent.Executors;
59 import java.util.concurrent.ScheduledExecutorService;
60 import org.junit.After;
61 import org.junit.Before;
62 import org.junit.Test;
63 import org.junit.runner.RunWith;
64 import org.junit.runners.Parameterized;
65 import org.junit.runners.Parameterized.Parameters;
66 
67 /**
68  * Tests for compression configurations.
69  *
70  * <p>Because of the asymmetry of clients and servers, clients will not know what decompression
71  * methods the server supports.  In cases where the client is willing to encode, and the server
72  * is willing to decode, a second RPC is sent to show that the client has learned and will use
73  * the encoding.
74  *
75  * <p>In cases where compression is negotiated, but either the client or the server doesn't
76  * actually want to encode, a dummy codec is used to record usage.  If compression is not enabled,
77  * the codec will see no data pass through.  This is checked on each test to ensure the code is
78  * doing the right thing.
79  */
80 @RunWith(Parameterized.class)
81 public class CompressionTest {
82   private static final ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
83   // Ensures that both the request and response messages are more than 0 bytes.  The framer/deframer
84   // may not use the compressor if the message is empty.
85   private static final SimpleRequest REQUEST = SimpleRequest.newBuilder()
86       .setResponseSize(1)
87       .build();
88 
89   private Fzip clientCodec = new Fzip("fzip", Codec.Identity.NONE);
90   private Fzip serverCodec = new Fzip("fzip", Codec.Identity.NONE);
91   private DecompressorRegistry clientDecompressors = DecompressorRegistry.emptyInstance();
92   private DecompressorRegistry serverDecompressors = DecompressorRegistry.emptyInstance();
93   private CompressorRegistry clientCompressors = CompressorRegistry.newEmptyInstance();
94   private CompressorRegistry serverCompressors = CompressorRegistry.newEmptyInstance();
95 
96   /** The headers received by the server from the client. */
97   private volatile Metadata serverResponseHeaders;
98   /** The headers received by the client from the server. */
99   private volatile Metadata clientResponseHeaders;
100 
101   // Params
102   private final boolean enableClientMessageCompression;
103   private final boolean enableServerMessageCompression;
104   private final boolean clientAcceptEncoding;
105   private final boolean clientEncoding;
106   private final boolean serverAcceptEncoding;
107   private final boolean serverEncoding;
108 
109   private Server server;
110   private ManagedChannel channel;
111   private TestServiceBlockingStub stub;
112 
113   /**
114    * Auto called by test.
115    */
CompressionTest( boolean enableClientMessageCompression, boolean clientAcceptEncoding, boolean clientEncoding, boolean enableServerMessageCompression, boolean serverAcceptEncoding, boolean serverEncoding)116   public CompressionTest(
117       boolean enableClientMessageCompression,
118       boolean clientAcceptEncoding,
119       boolean clientEncoding,
120       boolean enableServerMessageCompression,
121       boolean serverAcceptEncoding,
122       boolean serverEncoding) {
123     this.enableClientMessageCompression = enableClientMessageCompression;
124     this.clientAcceptEncoding = clientAcceptEncoding;
125     this.clientEncoding = clientEncoding;
126     this.enableServerMessageCompression = enableServerMessageCompression;
127     this.serverAcceptEncoding = serverAcceptEncoding;
128     this.serverEncoding = serverEncoding;
129   }
130 
131   @Before
setUp()132   public void setUp() throws Exception {
133     clientDecompressors = clientDecompressors.with(Codec.Identity.NONE, false);
134     serverDecompressors = serverDecompressors.with(Codec.Identity.NONE, false);
135   }
136 
137   @After
tearDown()138   public void tearDown() {
139     channel.shutdownNow();
140     server.shutdownNow();
141     executor.shutdownNow();
142   }
143 
144   /**
145    * Parameters for test.
146    */
147   @Parameters
params()148   public static Collection<Object[]> params() {
149     boolean[] bools = new boolean[]{false, true};
150     List<Object[]> combos = new ArrayList<Object[]>(64);
151     for (boolean enableClientMessageCompression : bools) {
152       for (boolean clientAcceptEncoding : bools) {
153         for (boolean clientEncoding : bools) {
154           for (boolean enableServerMessageCompression : bools) {
155             for (boolean serverAcceptEncoding : bools) {
156               for (boolean serverEncoding : bools) {
157                 combos.add(new Object[] {
158                     enableClientMessageCompression, clientAcceptEncoding, clientEncoding,
159                     enableServerMessageCompression, serverAcceptEncoding, serverEncoding});
160               }
161             }
162           }
163         }
164       }
165     }
166     return combos;
167   }
168 
169   @Test
compression()170   public void compression() throws Exception {
171     if (clientAcceptEncoding) {
172       clientDecompressors = clientDecompressors.with(clientCodec, true);
173     }
174     if (clientEncoding) {
175       clientCompressors.register(clientCodec);
176     }
177     if (serverAcceptEncoding) {
178       serverDecompressors = serverDecompressors.with(serverCodec, true);
179     }
180     if (serverEncoding) {
181       serverCompressors.register(serverCodec);
182     }
183 
184     server = ServerBuilder.forPort(0)
185         .addService(
186             ServerInterceptors.intercept(new LocalServer(), new ServerCompressorInterceptor()))
187         .compressorRegistry(serverCompressors)
188         .decompressorRegistry(serverDecompressors)
189         .build()
190         .start();
191 
192     channel = ManagedChannelBuilder.forAddress("localhost", server.getPort())
193         .decompressorRegistry(clientDecompressors)
194         .compressorRegistry(clientCompressors)
195         .intercept(new ClientCompressorInterceptor())
196         .usePlaintext()
197         .build();
198     stub = TestServiceGrpc.newBlockingStub(channel);
199 
200     stub.unaryCall(REQUEST);
201 
202     if (clientAcceptEncoding && serverEncoding) {
203       assertEquals("fzip", clientResponseHeaders.get(MESSAGE_ENCODING_KEY));
204       if (enableServerMessageCompression) {
205         assertTrue(clientCodec.anyRead);
206         assertTrue(serverCodec.anyWritten);
207       } else {
208         assertFalse(clientCodec.anyRead);
209         assertFalse(serverCodec.anyWritten);
210       }
211     } else {
212       // Either identity or null is accepted.
213       assertThat(clientResponseHeaders.get(MESSAGE_ENCODING_KEY))
214           .isAnyOf(Codec.Identity.NONE.getMessageEncoding(), null);
215       assertFalse(clientCodec.anyRead);
216       assertFalse(serverCodec.anyWritten);
217     }
218 
219     if (serverAcceptEncoding) {
220       assertEqualsString("fzip", clientResponseHeaders.get(MESSAGE_ACCEPT_ENCODING_KEY));
221     } else {
222       assertNull(clientResponseHeaders.get(MESSAGE_ACCEPT_ENCODING_KEY));
223     }
224 
225     if (clientAcceptEncoding) {
226       assertEqualsString("fzip", serverResponseHeaders.get(MESSAGE_ACCEPT_ENCODING_KEY));
227     } else {
228       assertNull(serverResponseHeaders.get(MESSAGE_ACCEPT_ENCODING_KEY));
229     }
230 
231     // Second call, once the client knows what the server supports.
232     if (clientEncoding && serverAcceptEncoding) {
233       assertEquals("fzip", serverResponseHeaders.get(MESSAGE_ENCODING_KEY));
234       if (enableClientMessageCompression) {
235         assertTrue(clientCodec.anyWritten);
236         assertTrue(serverCodec.anyRead);
237       } else {
238         assertFalse(clientCodec.anyWritten);
239         assertFalse(serverCodec.anyRead);
240       }
241     } else {
242       assertNull(serverResponseHeaders.get(MESSAGE_ENCODING_KEY));
243       assertFalse(clientCodec.anyWritten);
244       assertFalse(serverCodec.anyRead);
245     }
246   }
247 
248   private static final class LocalServer extends TestServiceGrpc.TestServiceImplBase {
249     @Override
unaryCall(SimpleRequest request, StreamObserver<SimpleResponse> responseObserver)250     public void unaryCall(SimpleRequest request, StreamObserver<SimpleResponse> responseObserver) {
251       responseObserver.onNext(SimpleResponse.newBuilder()
252           .setPayload(Payload.newBuilder()
253               .setBody(ByteString.copyFrom(new byte[]{127})))
254           .build());
255       responseObserver.onCompleted();
256     }
257   }
258 
259   private class ServerCompressorInterceptor implements ServerInterceptor {
260     @Override
interceptCall( ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next)261     public <ReqT, RespT> io.grpc.ServerCall.Listener<ReqT> interceptCall(
262         ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
263       if (serverEncoding) {
264         call.setCompression("fzip");
265       }
266       call.setMessageCompression(enableServerMessageCompression);
267       Metadata headersCopy = new Metadata();
268       headersCopy.merge(headers);
269       serverResponseHeaders = headersCopy;
270       return next.startCall(call, headers);
271     }
272   }
273 
274   private class ClientCompressorInterceptor implements ClientInterceptor {
275     @Override
interceptCall( MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next)276     public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
277         MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
278       if (clientEncoding && serverAcceptEncoding) {
279         callOptions = callOptions.withCompression("fzip");
280       }
281       ClientCall<ReqT, RespT> call = next.newCall(method, callOptions);
282 
283       return new ClientCompressor<ReqT, RespT>(call);
284     }
285   }
286 
287   private class ClientCompressor<ReqT, RespT> extends SimpleForwardingClientCall<ReqT, RespT> {
ClientCompressor(ClientCall<ReqT, RespT> delegate)288     protected ClientCompressor(ClientCall<ReqT, RespT> delegate) {
289       super(delegate);
290     }
291 
292     @Override
start(io.grpc.ClientCall.Listener<RespT> responseListener, Metadata headers)293     public void start(io.grpc.ClientCall.Listener<RespT> responseListener, Metadata headers) {
294       super.start(new ClientHeadersCapture<RespT>(responseListener), headers);
295       setMessageCompression(enableClientMessageCompression);
296     }
297   }
298 
299   private class ClientHeadersCapture<RespT> extends SimpleForwardingClientCallListener<RespT> {
ClientHeadersCapture(Listener<RespT> delegate)300     private ClientHeadersCapture(Listener<RespT> delegate) {
301       super(delegate);
302     }
303 
304     @Override
onHeaders(Metadata headers)305     public void onHeaders(Metadata headers) {
306       super.onHeaders(headers);
307       Metadata headersCopy = new Metadata();
308       headersCopy.merge(headers);
309       clientResponseHeaders = headersCopy;
310     }
311   }
312 
assertEqualsString(String expected, byte[] actual)313   private static void assertEqualsString(String expected, byte[] actual) {
314     assertEquals(expected, new String(actual, Charset.forName("US-ASCII")));
315   }
316 }
317