1 /*
2  * Copyright 2014 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.testing.integration;
18 
19 import com.google.common.base.Preconditions;
20 import com.google.common.collect.Queues;
21 import com.google.protobuf.ByteString;
22 import io.grpc.ForwardingServerCall.SimpleForwardingServerCall;
23 import io.grpc.Metadata;
24 import io.grpc.ServerCall;
25 import io.grpc.ServerCallHandler;
26 import io.grpc.ServerInterceptor;
27 import io.grpc.Status;
28 import io.grpc.internal.LogExceptionRunnable;
29 import io.grpc.stub.ServerCallStreamObserver;
30 import io.grpc.stub.StreamObserver;
31 import io.grpc.testing.integration.Messages.Payload;
32 import io.grpc.testing.integration.Messages.PayloadType;
33 import io.grpc.testing.integration.Messages.ResponseParameters;
34 import io.grpc.testing.integration.Messages.SimpleRequest;
35 import io.grpc.testing.integration.Messages.SimpleResponse;
36 import io.grpc.testing.integration.Messages.StreamingInputCallRequest;
37 import io.grpc.testing.integration.Messages.StreamingInputCallResponse;
38 import io.grpc.testing.integration.Messages.StreamingOutputCallRequest;
39 import io.grpc.testing.integration.Messages.StreamingOutputCallResponse;
40 import java.io.IOException;
41 import java.io.InputStream;
42 import java.util.ArrayDeque;
43 import java.util.Arrays;
44 import java.util.HashSet;
45 import java.util.List;
46 import java.util.Queue;
47 import java.util.Random;
48 import java.util.Set;
49 import java.util.concurrent.Future;
50 import java.util.concurrent.ScheduledExecutorService;
51 import java.util.concurrent.TimeUnit;
52 import javax.annotation.concurrent.GuardedBy;
53 
54 /**
55  * Implementation of the business logic for the TestService. Uses an executor to schedule chunks
56  * sent in response streams.
57  */
58 public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase {
59   private static final String UNCOMPRESSABLE_FILE =
60       "/io/grpc/testing/integration/testdata/uncompressable.bin";
61   private final Random random = new Random();
62 
63   private final ScheduledExecutorService executor;
64   private final ByteString uncompressableBuffer;
65   private final ByteString compressableBuffer;
66 
67   /**
68    * Constructs a controller using the given executor for scheduling response stream chunks.
69    */
TestServiceImpl(ScheduledExecutorService executor)70   public TestServiceImpl(ScheduledExecutorService executor) {
71     this.executor = executor;
72     this.compressableBuffer = ByteString.copyFrom(new byte[1024]);
73     this.uncompressableBuffer = createBufferFromFile(UNCOMPRESSABLE_FILE);
74   }
75 
76   @Override
emptyCall(EmptyProtos.Empty empty, StreamObserver<EmptyProtos.Empty> responseObserver)77   public void emptyCall(EmptyProtos.Empty empty,
78       StreamObserver<EmptyProtos.Empty> responseObserver) {
79     responseObserver.onNext(EmptyProtos.Empty.getDefaultInstance());
80     responseObserver.onCompleted();
81   }
82 
83   /**
84    * Immediately responds with a payload of the type and size specified in the request.
85    */
86   @Override
unaryCall(SimpleRequest req, StreamObserver<SimpleResponse> responseObserver)87   public void unaryCall(SimpleRequest req, StreamObserver<SimpleResponse> responseObserver) {
88     ServerCallStreamObserver<SimpleResponse> obs =
89         (ServerCallStreamObserver<SimpleResponse>) responseObserver;
90     SimpleResponse.Builder responseBuilder = SimpleResponse.newBuilder();
91     try {
92       if (req.hasResponseCompressed() && req.getResponseCompressed().getValue()) {
93         obs.setCompression("gzip");
94       } else {
95         obs.setCompression("identity");
96       }
97     } catch (IllegalArgumentException e) {
98       obs.onError(Status.UNIMPLEMENTED
99           .withDescription("compression not supported.")
100           .withCause(e)
101           .asRuntimeException());
102       return;
103     }
104 
105     if (req.getResponseSize() != 0) {
106       boolean compressable = compressableResponse(req.getResponseType());
107       ByteString dataBuffer = compressable ? compressableBuffer : uncompressableBuffer;
108       // For consistency with the c++ TestServiceImpl, use a random offset for unary calls.
109       // TODO(wonderfly): whether or not this is a good approach needs further discussion.
110       int offset = random.nextInt(
111           compressable ? compressableBuffer.size() : uncompressableBuffer.size());
112       ByteString payload = generatePayload(dataBuffer, offset, req.getResponseSize());
113       responseBuilder.setPayload(
114           Payload.newBuilder()
115               .setType(compressable ? PayloadType.COMPRESSABLE : PayloadType.UNCOMPRESSABLE)
116               .setBody(payload));
117     }
118 
119     if (req.hasResponseStatus()) {
120       obs.onError(Status.fromCodeValue(req.getResponseStatus().getCode())
121           .withDescription(req.getResponseStatus().getMessage())
122           .asRuntimeException());
123       return;
124     }
125 
126     responseObserver.onNext(responseBuilder.build());
127     responseObserver.onCompleted();
128   }
129 
130   /**
131    * Given a request that specifies chunk size and interval between responses, creates and schedules
132    * the response stream.
133    */
134   @Override
streamingOutputCall(StreamingOutputCallRequest request, StreamObserver<StreamingOutputCallResponse> responseObserver)135   public void streamingOutputCall(StreamingOutputCallRequest request,
136       StreamObserver<StreamingOutputCallResponse> responseObserver) {
137     // Create and start the response dispatcher.
138     new ResponseDispatcher(responseObserver).enqueue(toChunkQueue(request)).completeInput();
139   }
140 
141   /**
142    * Waits until we have received all of the request messages and then returns the aggregate payload
143    * size for all of the received requests.
144    */
145   @Override
streamingInputCall( final StreamObserver<Messages.StreamingInputCallResponse> responseObserver)146   public StreamObserver<Messages.StreamingInputCallRequest> streamingInputCall(
147       final StreamObserver<Messages.StreamingInputCallResponse> responseObserver) {
148     return new StreamObserver<StreamingInputCallRequest>() {
149       private int totalPayloadSize;
150 
151       @Override
152       public void onNext(StreamingInputCallRequest message) {
153         totalPayloadSize += message.getPayload().getBody().size();
154       }
155 
156       @Override
157       public void onCompleted() {
158         responseObserver.onNext(StreamingInputCallResponse.newBuilder()
159             .setAggregatedPayloadSize(totalPayloadSize).build());
160         responseObserver.onCompleted();
161       }
162 
163       @Override
164       public void onError(Throwable cause) {
165         responseObserver.onError(cause);
166       }
167     };
168   }
169 
170   /**
171    * True bi-directional streaming. Processes requests as they come in. Begins streaming results
172    * immediately.
173    */
174   @Override
fullDuplexCall( final StreamObserver<Messages.StreamingOutputCallResponse> responseObserver)175   public StreamObserver<Messages.StreamingOutputCallRequest> fullDuplexCall(
176       final StreamObserver<Messages.StreamingOutputCallResponse> responseObserver) {
177     final ResponseDispatcher dispatcher = new ResponseDispatcher(responseObserver);
178     return new StreamObserver<StreamingOutputCallRequest>() {
179       @Override
180       public void onNext(StreamingOutputCallRequest request) {
181         if (request.hasResponseStatus()) {
182           dispatcher.cancel();
183           dispatcher.onError(Status.fromCodeValue(request.getResponseStatus().getCode())
184               .withDescription(request.getResponseStatus().getMessage())
185               .asRuntimeException());
186           return;
187         }
188         dispatcher.enqueue(toChunkQueue(request));
189       }
190 
191       @Override
192       public void onCompleted() {
193         if (!dispatcher.isCancelled()) {
194           // Tell the dispatcher that all input has been received.
195           dispatcher.completeInput();
196         }
197       }
198 
199       @Override
200       public void onError(Throwable cause) {
201         dispatcher.onError(cause);
202       }
203     };
204   }
205 
206   /**
207    * Similar to {@link #fullDuplexCall}, except that it waits for all streaming requests to be
208    * received before starting the streaming responses.
209    */
210   @Override
211   public StreamObserver<Messages.StreamingOutputCallRequest> halfDuplexCall(
212       final StreamObserver<Messages.StreamingOutputCallResponse> responseObserver) {
213     final ResponseDispatcher dispatcher = new ResponseDispatcher(responseObserver);
214     final Queue<Chunk> chunks = new ArrayDeque<Chunk>();
215     return new StreamObserver<StreamingOutputCallRequest>() {
216       @Override
217       public void onNext(StreamingOutputCallRequest request) {
218         chunks.addAll(toChunkQueue(request));
219       }
220 
221       @Override
222       public void onCompleted() {
223         // Dispatch all of the chunks in one shot.
224         dispatcher.enqueue(chunks).completeInput();
225       }
226 
227       @Override
228       public void onError(Throwable cause) {
229         dispatcher.onError(cause);
230       }
231     };
232   }
233 
234   /**
235    * Schedules the dispatch of a queue of chunks. Whenever chunks are added or input is completed,
236    * the next response chunk is scheduled for delivery to the client. When no more chunks are
237    * available, the stream is half-closed.
238    */
239   private class ResponseDispatcher {
240     private final Chunk completionChunk = new Chunk(0, 0, 0, false);
241     private final Queue<Chunk> chunks;
242     private final StreamObserver<StreamingOutputCallResponse> responseStream;
243     private boolean scheduled;
244     @GuardedBy("this") private boolean cancelled;
245     private Throwable failure;
246     private Runnable dispatchTask = new Runnable() {
247       @Override
248       public void run() {
249         try {
250 
251           // Dispatch the current chunk to the client.
252           try {
253             dispatchChunk();
254           } catch (RuntimeException e) {
255             // Indicate that nothing is scheduled and re-throw.
256             synchronized (ResponseDispatcher.this) {
257               scheduled = false;
258             }
259             throw e;
260           }
261 
262           // Schedule the next chunk if there is one.
263           synchronized (ResponseDispatcher.this) {
264             // Indicate that nothing is scheduled.
265             scheduled = false;
266             scheduleNextChunk();
267           }
268         } catch (Throwable t) {
269           t.printStackTrace();
270         }
271       }
272     };
273 
274     /**
275      * The {@link StreamObserver} will be used to send the queue of response chunks. Since calls to
276      * {@link StreamObserver} must be synchronized across threads, no further calls should be made
277      * directly on {@code responseStream} after it is provided to the {@link ResponseDispatcher}.
278      */
279     public ResponseDispatcher(StreamObserver<StreamingOutputCallResponse> responseStream) {
280       this.chunks = Queues.newLinkedBlockingQueue();
281       this.responseStream = responseStream;
282     }
283 
284     /**
285      * Adds the given chunks to the response stream and schedules the next chunk to be delivered if
286      * needed.
287      */
288     public synchronized ResponseDispatcher enqueue(Queue<Chunk> moreChunks) {
289       assertNotFailed();
290       chunks.addAll(moreChunks);
291       scheduleNextChunk();
292       return this;
293     }
294 
295     /**
296      * Indicates that the input is completed and the currently enqueued response chunks are all that
297      * remain to be scheduled for dispatch to the client.
298      */
299     public ResponseDispatcher completeInput() {
300       assertNotFailed();
301       chunks.add(completionChunk);
302       scheduleNextChunk();
303       return this;
304     }
305 
306     /**
307      * Allows the service to cancel the remaining responses.
308      */
309     public synchronized void cancel() {
310       Preconditions.checkState(!cancelled, "Dispatcher already cancelled");
311       chunks.clear();
312       cancelled = true;
313     }
314 
315     public synchronized boolean isCancelled() {
316       return cancelled;
317     }
318 
319     private synchronized void onError(Throwable cause) {
320       responseStream.onError(cause);
321     }
322 
323     /**
324      * Dispatches the current response chunk to the client. This is only called by the executor. At
325      * any time, a given dispatch task should only be registered with the executor once.
326      */
327     private synchronized void dispatchChunk() {
328       if (cancelled) {
329         return;
330       }
331       try {
332         // Pop off the next chunk and send it to the client.
333         Chunk chunk = chunks.remove();
334         if (chunk == completionChunk) {
335           responseStream.onCompleted();
336         } else {
337           responseStream.onNext(chunk.toResponse());
338         }
339       } catch (Throwable e) {
340         failure = e;
341         if (Status.fromThrowable(e).getCode() == Status.CANCELLED.getCode()) {
342           // Stream was cancelled by client, responseStream.onError() might be called already or
343           // will be called soon by inbounding StreamObserver.
344           chunks.clear();
345         } else {
346           responseStream.onError(e);
347         }
348       }
349     }
350 
351     /**
352      * Schedules the next response chunk to be dispatched. If all input has been received and there
353      * are no more chunks in the queue, the stream is closed.
354      */
355     private void scheduleNextChunk() {
356       synchronized (this) {
357         if (scheduled) {
358           // Dispatch task is already scheduled.
359           return;
360         }
361 
362         // Schedule the next response chunk if there is one.
363         Chunk nextChunk = chunks.peek();
364         if (nextChunk != null) {
365           scheduled = true;
366           // TODO(ejona): cancel future if RPC is cancelled
367           Future<?> unused = executor.schedule(new LogExceptionRunnable(dispatchTask),
368               nextChunk.delayMicroseconds, TimeUnit.MICROSECONDS);
369           return;
370         }
371       }
372     }
373 
374     private void assertNotFailed() {
375       if (failure != null) {
376         throw new IllegalStateException("Stream already failed", failure);
377       }
378     }
379   }
380 
381   /**
382    * Breaks down the request and creates a queue of response chunks for the given request.
383    */
384   public Queue<Chunk> toChunkQueue(StreamingOutputCallRequest request) {
385     Queue<Chunk> chunkQueue = new ArrayDeque<Chunk>();
386     int offset = 0;
387     boolean compressable = compressableResponse(request.getResponseType());
388     for (ResponseParameters params : request.getResponseParametersList()) {
389       chunkQueue.add(new Chunk(params.getIntervalUs(), offset, params.getSize(), compressable));
390 
391       // Increment the offset past this chunk.
392       // Both buffers need to be circular.
393       offset = (offset + params.getSize())
394           % (compressable ? compressableBuffer.size() : uncompressableBuffer.size());
395     }
396     return chunkQueue;
397   }
398 
399   /**
400    * A single chunk of a response stream. Contains delivery information for the dispatcher and can
401    * be converted to a streaming response proto. A chunk just references it's payload in the
402    * {@link #uncompressableBuffer} array. The payload isn't actually created until {@link
403    * #toResponse()} is called.
404    */
405   private class Chunk {
406     private final int delayMicroseconds;
407     private final int offset;
408     private final int length;
409     private final boolean compressable;
410 
411     public Chunk(int delayMicroseconds, int offset, int length, boolean compressable) {
412       this.delayMicroseconds = delayMicroseconds;
413       this.offset = offset;
414       this.length = length;
415       this.compressable = compressable;
416     }
417 
418     /**
419      * Convert this chunk into a streaming response proto.
420      */
421     private StreamingOutputCallResponse toResponse() {
422       StreamingOutputCallResponse.Builder responseBuilder =
423           StreamingOutputCallResponse.newBuilder();
424       ByteString dataBuffer = compressable ? compressableBuffer : uncompressableBuffer;
425       ByteString payload = generatePayload(dataBuffer, offset, length);
426       responseBuilder.setPayload(
427           Payload.newBuilder()
428               .setType(compressable ? PayloadType.COMPRESSABLE : PayloadType.UNCOMPRESSABLE)
429               .setBody(payload));
430       return responseBuilder.build();
431     }
432   }
433 
434   /**
435    * Creates a buffer with data read from a file.
436    */
437   @SuppressWarnings("Finally") // Not concerned about suppression; expected to be exceedingly rare
438   private ByteString createBufferFromFile(String fileClassPath) {
439     ByteString buffer = ByteString.EMPTY;
440     InputStream inputStream = getClass().getResourceAsStream(fileClassPath);
441     if (inputStream == null) {
442       throw new IllegalArgumentException("Unable to locate file on classpath: " + fileClassPath);
443     }
444 
445     try {
446       buffer = ByteString.readFrom(inputStream);
447     } catch (IOException e) {
448       throw new RuntimeException(e);
449     } finally {
450       try {
451         inputStream.close();
452       } catch (IOException ignorable) {
453         // ignore
454       }
455     }
456     return buffer;
457   }
458 
459   /**
460    * Indicates whether or not the response for this type should be compressable. If {@code RANDOM},
461    * picks a random boolean.
462    */
463   private boolean compressableResponse(PayloadType responseType) {
464     switch (responseType) {
465       case COMPRESSABLE:
466         return true;
467       case RANDOM:
468         return random.nextBoolean();
469       case UNCOMPRESSABLE:
470       default:
471         return false;
472     }
473   }
474 
475   /**
476    * Generates a payload of desired type and size. Reads compressableBuffer or
477    * uncompressableBuffer as a circular buffer.
478    */
479   private ByteString generatePayload(ByteString dataBuffer, int offset, int size) {
480     ByteString payload = ByteString.EMPTY;
481     // This offset would never pass the array boundary.
482     int begin = offset;
483     int end = 0;
484     int bytesLeft = size;
485     while (bytesLeft > 0) {
486       end = Math.min(begin + bytesLeft, dataBuffer.size());
487       // ByteString.substring returns the substring from begin, inclusive, to end, exclusive.
488       payload = payload.concat(dataBuffer.substring(begin, end));
489       bytesLeft -= (end - begin);
490       begin = end % dataBuffer.size();
491     }
492     return payload;
493   }
494 
495   /** Returns interceptors necessary for full service implementation. */
496   public static List<ServerInterceptor> interceptors() {
497     return Arrays.asList(
498         echoRequestHeadersInterceptor(Util.METADATA_KEY),
499         echoRequestMetadataInHeaders(Util.ECHO_INITIAL_METADATA_KEY),
500         echoRequestMetadataInTrailers(Util.ECHO_TRAILING_METADATA_KEY));
501   }
502 
503   /**
504    * Echo the request headers from a client into response headers and trailers. Useful for
505    * testing end-to-end metadata propagation.
506    */
507   private static ServerInterceptor echoRequestHeadersInterceptor(final Metadata.Key<?>... keys) {
508     final Set<Metadata.Key<?>> keySet = new HashSet<Metadata.Key<?>>(Arrays.asList(keys));
509     return new ServerInterceptor() {
510       @Override
511       public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
512           ServerCall<ReqT, RespT> call,
513           final Metadata requestHeaders,
514           ServerCallHandler<ReqT, RespT> next) {
515         return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
516               @Override
517               public void sendHeaders(Metadata responseHeaders) {
518                 responseHeaders.merge(requestHeaders, keySet);
519                 super.sendHeaders(responseHeaders);
520               }
521 
522               @Override
523               public void close(Status status, Metadata trailers) {
524                 trailers.merge(requestHeaders, keySet);
525                 super.close(status, trailers);
526               }
527             }, requestHeaders);
528       }
529     };
530   }
531 
532   /**
533    * Echoes request headers with the specified key(s) from a client into response headers only.
534    */
535   private static ServerInterceptor echoRequestMetadataInHeaders(final Metadata.Key<?>... keys) {
536     final Set<Metadata.Key<?>> keySet = new HashSet<Metadata.Key<?>>(Arrays.asList(keys));
537     return new ServerInterceptor() {
538       @Override
539       public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
540           ServerCall<ReqT, RespT> call,
541           final Metadata requestHeaders,
542           ServerCallHandler<ReqT, RespT> next) {
543         return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
544           @Override
545           public void sendHeaders(Metadata responseHeaders) {
546             responseHeaders.merge(requestHeaders, keySet);
547             super.sendHeaders(responseHeaders);
548           }
549 
550           @Override
551           public void close(Status status, Metadata trailers) {
552             super.close(status, trailers);
553           }
554         }, requestHeaders);
555       }
556     };
557   }
558 
559   /**
560    * Echoes request headers with the specified key(s) from a client into response trailers only.
561    */
562   private static ServerInterceptor echoRequestMetadataInTrailers(final Metadata.Key<?>... keys) {
563     final Set<Metadata.Key<?>> keySet = new HashSet<Metadata.Key<?>>(Arrays.asList(keys));
564     return new ServerInterceptor() {
565       @Override
566       public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
567           ServerCall<ReqT, RespT> call,
568           final Metadata requestHeaders,
569           ServerCallHandler<ReqT, RespT> next) {
570         return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
571           @Override
572           public void sendHeaders(Metadata responseHeaders) {
573             super.sendHeaders(responseHeaders);
574           }
575 
576           @Override
577           public void close(Status status, Metadata trailers) {
578             trailers.merge(requestHeaders, keySet);
579             super.close(status, trailers);
580           }
581         }, requestHeaders);
582       }
583     };
584   }
585 }
586