1 /*
2  * Copyright 2014 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.stub;
18 
19 import static com.google.common.base.Preconditions.checkNotNull;
20 import static com.google.common.base.Preconditions.checkState;
21 
22 import com.google.common.annotations.VisibleForTesting;
23 import com.google.common.base.Preconditions;
24 import io.grpc.Metadata;
25 import io.grpc.MethodDescriptor;
26 import io.grpc.ServerCall;
27 import io.grpc.ServerCallHandler;
28 import io.grpc.Status;
29 
30 /**
31  * Utility functions for adapting {@link ServerCallHandler}s to application service implementation,
32  * meant to be used by the generated code.
33  */
34 public final class ServerCalls {
35 
36   @VisibleForTesting
37   static final String TOO_MANY_REQUESTS = "Too many requests";
38   @VisibleForTesting
39   static final String MISSING_REQUEST = "Half-closed without a request";
40 
ServerCalls()41   private ServerCalls() {
42   }
43 
44   /**
45    * Creates a {@link ServerCallHandler} for a unary call method of the service.
46    *
47    * @param method an adaptor to the actual method on the service implementation.
48    */
asyncUnaryCall( UnaryMethod<ReqT, RespT> method)49   public static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncUnaryCall(
50       UnaryMethod<ReqT, RespT> method) {
51     return asyncUnaryRequestCall(method);
52   }
53 
54   /**
55    * Creates a {@link ServerCallHandler} for a server streaming method of the service.
56    *
57    * @param method an adaptor to the actual method on the service implementation.
58    */
asyncServerStreamingCall( ServerStreamingMethod<ReqT, RespT> method)59   public static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncServerStreamingCall(
60       ServerStreamingMethod<ReqT, RespT> method) {
61     return asyncUnaryRequestCall(method);
62   }
63 
64   /**
65    * Creates a {@link ServerCallHandler} for a client streaming method of the service.
66    *
67    * @param method an adaptor to the actual method on the service implementation.
68    */
asyncClientStreamingCall( ClientStreamingMethod<ReqT, RespT> method)69   public static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncClientStreamingCall(
70       ClientStreamingMethod<ReqT, RespT> method) {
71     return asyncStreamingRequestCall(method);
72   }
73 
74   /**
75    * Creates a {@link ServerCallHandler} for a bidi streaming method of the service.
76    *
77    * @param method an adaptor to the actual method on the service implementation.
78    */
asyncBidiStreamingCall( BidiStreamingMethod<ReqT, RespT> method)79   public static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncBidiStreamingCall(
80       BidiStreamingMethod<ReqT, RespT> method) {
81     return asyncStreamingRequestCall(method);
82   }
83 
84   /**
85    * Adaptor to a unary call method.
86    */
87   public interface UnaryMethod<ReqT, RespT> extends UnaryRequestMethod<ReqT, RespT> {}
88 
89   /**
90    * Adaptor to a server streaming method.
91    */
92   public interface ServerStreamingMethod<ReqT, RespT> extends UnaryRequestMethod<ReqT, RespT> {}
93 
94   /**
95    * Adaptor to a client streaming method.
96    */
97   public interface ClientStreamingMethod<ReqT, RespT> extends StreamingRequestMethod<ReqT, RespT> {}
98 
99   /**
100    * Adaptor to a bidirectional streaming method.
101    */
102   public interface BidiStreamingMethod<ReqT, RespT> extends StreamingRequestMethod<ReqT, RespT> {}
103 
104   private static final class UnaryServerCallHandler<ReqT, RespT>
105       implements ServerCallHandler<ReqT, RespT> {
106 
107     private final UnaryRequestMethod<ReqT, RespT> method;
108 
109     // Non private to avoid synthetic class
UnaryServerCallHandler(UnaryRequestMethod<ReqT, RespT> method)110     UnaryServerCallHandler(UnaryRequestMethod<ReqT, RespT> method) {
111       this.method = method;
112     }
113 
114     @Override
startCall(ServerCall<ReqT, RespT> call, Metadata headers)115     public ServerCall.Listener<ReqT> startCall(ServerCall<ReqT, RespT> call, Metadata headers) {
116       Preconditions.checkArgument(
117           call.getMethodDescriptor().getType().clientSendsOneMessage(),
118           "asyncUnaryRequestCall is only for clientSendsOneMessage methods");
119       ServerCallStreamObserverImpl<ReqT, RespT> responseObserver =
120           new ServerCallStreamObserverImpl<ReqT, RespT>(call);
121       // We expect only 1 request, but we ask for 2 requests here so that if a misbehaving client
122       // sends more than 1 requests, ServerCall will catch it. Note that disabling auto
123       // inbound flow control has no effect on unary calls.
124       call.request(2);
125       return new UnaryServerCallListener(responseObserver, call);
126     }
127 
128     private final class UnaryServerCallListener extends ServerCall.Listener<ReqT> {
129       private final ServerCall<ReqT, RespT> call;
130       private final ServerCallStreamObserverImpl<ReqT, RespT> responseObserver;
131       private boolean canInvoke = true;
132       private ReqT request;
133 
134       // Non private to avoid synthetic class
UnaryServerCallListener( ServerCallStreamObserverImpl<ReqT, RespT> responseObserver, ServerCall<ReqT, RespT> call)135       UnaryServerCallListener(
136           ServerCallStreamObserverImpl<ReqT, RespT> responseObserver,
137           ServerCall<ReqT, RespT> call) {
138         this.call = call;
139         this.responseObserver = responseObserver;
140       }
141 
142       @Override
onMessage(ReqT request)143       public void onMessage(ReqT request) {
144         if (this.request != null) {
145           // Safe to close the call, because the application has not yet been invoked
146           call.close(
147               Status.INTERNAL.withDescription(TOO_MANY_REQUESTS),
148               new Metadata());
149           canInvoke = false;
150           return;
151         }
152 
153         // We delay calling method.invoke() until onHalfClose() to make sure the client
154         // half-closes.
155         this.request = request;
156       }
157 
158       @Override
onHalfClose()159       public void onHalfClose() {
160         if (!canInvoke) {
161           return;
162         }
163         if (request == null) {
164           // Safe to close the call, because the application has not yet been invoked
165           call.close(
166               Status.INTERNAL.withDescription(MISSING_REQUEST),
167               new Metadata());
168           return;
169         }
170 
171         method.invoke(request, responseObserver);
172         responseObserver.freeze();
173         if (call.isReady()) {
174           // Since we are calling invoke in halfClose we have missed the onReady
175           // event from the transport so recover it here.
176           onReady();
177         }
178       }
179 
180       @Override
onCancel()181       public void onCancel() {
182         responseObserver.cancelled = true;
183         if (responseObserver.onCancelHandler != null) {
184           responseObserver.onCancelHandler.run();
185         }
186       }
187 
188       @Override
onReady()189       public void onReady() {
190         if (responseObserver.onReadyHandler != null) {
191           responseObserver.onReadyHandler.run();
192         }
193       }
194     }
195   }
196 
197   /**
198    * Creates a {@link ServerCallHandler} for a unary request call method of the service.
199    *
200    * @param method an adaptor to the actual method on the service implementation.
201    */
asyncUnaryRequestCall( UnaryRequestMethod<ReqT, RespT> method)202   private static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncUnaryRequestCall(
203       UnaryRequestMethod<ReqT, RespT> method) {
204     return new UnaryServerCallHandler<ReqT, RespT>(method);
205   }
206 
207   private static final class StreamingServerCallHandler<ReqT, RespT>
208       implements ServerCallHandler<ReqT, RespT> {
209 
210     private final StreamingRequestMethod<ReqT, RespT> method;
211 
212     // Non private to avoid synthetic class
StreamingServerCallHandler(StreamingRequestMethod<ReqT, RespT> method)213     StreamingServerCallHandler(StreamingRequestMethod<ReqT, RespT> method) {
214       this.method = method;
215     }
216 
217     @Override
startCall(ServerCall<ReqT, RespT> call, Metadata headers)218     public ServerCall.Listener<ReqT> startCall(ServerCall<ReqT, RespT> call, Metadata headers) {
219       ServerCallStreamObserverImpl<ReqT, RespT> responseObserver =
220           new ServerCallStreamObserverImpl<ReqT, RespT>(call);
221       StreamObserver<ReqT> requestObserver = method.invoke(responseObserver);
222       responseObserver.freeze();
223       if (responseObserver.autoFlowControlEnabled) {
224         call.request(1);
225       }
226       return new StreamingServerCallListener(requestObserver, responseObserver, call);
227     }
228 
229     private final class StreamingServerCallListener extends ServerCall.Listener<ReqT> {
230 
231       private final StreamObserver<ReqT> requestObserver;
232       private final ServerCallStreamObserverImpl<ReqT, RespT> responseObserver;
233       private final ServerCall<ReqT, RespT> call;
234       private boolean halfClosed = false;
235 
236       // Non private to avoid synthetic class
StreamingServerCallListener( StreamObserver<ReqT> requestObserver, ServerCallStreamObserverImpl<ReqT, RespT> responseObserver, ServerCall<ReqT, RespT> call)237       StreamingServerCallListener(
238           StreamObserver<ReqT> requestObserver,
239           ServerCallStreamObserverImpl<ReqT, RespT> responseObserver,
240           ServerCall<ReqT, RespT> call) {
241         this.requestObserver = requestObserver;
242         this.responseObserver = responseObserver;
243         this.call = call;
244       }
245 
246       @Override
onMessage(ReqT request)247       public void onMessage(ReqT request) {
248         requestObserver.onNext(request);
249 
250         // Request delivery of the next inbound message.
251         if (responseObserver.autoFlowControlEnabled) {
252           call.request(1);
253         }
254       }
255 
256       @Override
onHalfClose()257       public void onHalfClose() {
258         halfClosed = true;
259         requestObserver.onCompleted();
260       }
261 
262       @Override
onCancel()263       public void onCancel() {
264         responseObserver.cancelled = true;
265         if (responseObserver.onCancelHandler != null) {
266           responseObserver.onCancelHandler.run();
267         }
268         if (!halfClosed) {
269           requestObserver.onError(
270               Status.CANCELLED
271                   .withDescription("cancelled before receiving half close")
272                   .asRuntimeException());
273         }
274       }
275 
276       @Override
onReady()277       public void onReady() {
278         if (responseObserver.onReadyHandler != null) {
279           responseObserver.onReadyHandler.run();
280         }
281       }
282     }
283   }
284 
285   /**
286    * Creates a {@link ServerCallHandler} for a streaming request call method of the service.
287    *
288    * @param method an adaptor to the actual method on the service implementation.
289    */
asyncStreamingRequestCall( StreamingRequestMethod<ReqT, RespT> method)290   private static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncStreamingRequestCall(
291       StreamingRequestMethod<ReqT, RespT> method) {
292     return new StreamingServerCallHandler<ReqT, RespT>(method);
293   }
294 
295   private interface UnaryRequestMethod<ReqT, RespT> {
invoke(ReqT request, StreamObserver<RespT> responseObserver)296     void invoke(ReqT request, StreamObserver<RespT> responseObserver);
297   }
298 
299   private interface StreamingRequestMethod<ReqT, RespT> {
invoke(StreamObserver<RespT> responseObserver)300     StreamObserver<ReqT> invoke(StreamObserver<RespT> responseObserver);
301   }
302 
303   private static final class ServerCallStreamObserverImpl<ReqT, RespT>
304       extends ServerCallStreamObserver<RespT> {
305     final ServerCall<ReqT, RespT> call;
306     volatile boolean cancelled;
307     private boolean frozen;
308     private boolean autoFlowControlEnabled = true;
309     private boolean sentHeaders;
310     private Runnable onReadyHandler;
311     private Runnable onCancelHandler;
312 
313     // Non private to avoid synthetic class
ServerCallStreamObserverImpl(ServerCall<ReqT, RespT> call)314     ServerCallStreamObserverImpl(ServerCall<ReqT, RespT> call) {
315       this.call = call;
316     }
317 
freeze()318     private void freeze() {
319       this.frozen = true;
320     }
321 
322     @Override
setMessageCompression(boolean enable)323     public void setMessageCompression(boolean enable) {
324       call.setMessageCompression(enable);
325     }
326 
327     @Override
setCompression(String compression)328     public void setCompression(String compression) {
329       call.setCompression(compression);
330     }
331 
332     @Override
onNext(RespT response)333     public void onNext(RespT response) {
334       if (cancelled) {
335         throw Status.CANCELLED.withDescription("call already cancelled").asRuntimeException();
336       }
337       if (!sentHeaders) {
338         call.sendHeaders(new Metadata());
339         sentHeaders = true;
340       }
341       call.sendMessage(response);
342     }
343 
344     @Override
onError(Throwable t)345     public void onError(Throwable t) {
346       Metadata metadata = Status.trailersFromThrowable(t);
347       if (metadata == null) {
348         metadata = new Metadata();
349       }
350       call.close(Status.fromThrowable(t), metadata);
351     }
352 
353     @Override
onCompleted()354     public void onCompleted() {
355       if (cancelled) {
356         throw Status.CANCELLED.withDescription("call already cancelled").asRuntimeException();
357       } else {
358         call.close(Status.OK, new Metadata());
359       }
360     }
361 
362     @Override
isReady()363     public boolean isReady() {
364       return call.isReady();
365     }
366 
367     @Override
setOnReadyHandler(Runnable r)368     public void setOnReadyHandler(Runnable r) {
369       checkState(!frozen, "Cannot alter onReadyHandler after initialization");
370       this.onReadyHandler = r;
371     }
372 
373     @Override
isCancelled()374     public boolean isCancelled() {
375       return call.isCancelled();
376     }
377 
378     @Override
setOnCancelHandler(Runnable onCancelHandler)379     public void setOnCancelHandler(Runnable onCancelHandler) {
380       checkState(!frozen, "Cannot alter onCancelHandler after initialization");
381       this.onCancelHandler = onCancelHandler;
382     }
383 
384     @Override
disableAutoInboundFlowControl()385     public void disableAutoInboundFlowControl() {
386       checkState(!frozen, "Cannot disable auto flow control after initialization");
387       autoFlowControlEnabled = false;
388     }
389 
390     @Override
request(int count)391     public void request(int count) {
392       call.request(count);
393     }
394   }
395 
396   /**
397    * Sets unimplemented status for method on given response stream for unary call.
398    *
399    * @param methodDescriptor of method for which error will be thrown.
400    * @param responseObserver on which error will be set.
401    */
asyncUnimplementedUnaryCall( MethodDescriptor<?, ?> methodDescriptor, StreamObserver<?> responseObserver)402   public static void asyncUnimplementedUnaryCall(
403       MethodDescriptor<?, ?> methodDescriptor, StreamObserver<?> responseObserver) {
404     checkNotNull(methodDescriptor, "methodDescriptor");
405     checkNotNull(responseObserver, "responseObserver");
406     responseObserver.onError(Status.UNIMPLEMENTED
407         .withDescription(String.format("Method %s is unimplemented",
408             methodDescriptor.getFullMethodName()))
409         .asRuntimeException());
410   }
411 
412   /**
413    * Sets unimplemented status for streaming call.
414    *
415    * @param methodDescriptor of method for which error will be thrown.
416    * @param responseObserver on which error will be set.
417    */
asyncUnimplementedStreamingCall( MethodDescriptor<?, ?> methodDescriptor, StreamObserver<?> responseObserver)418   public static <T> StreamObserver<T> asyncUnimplementedStreamingCall(
419       MethodDescriptor<?, ?> methodDescriptor, StreamObserver<?> responseObserver) {
420     // NB: For streaming call we want to do the same as for unary call. Fail-fast by setting error
421     // on responseObserver and then return no-op observer.
422     asyncUnimplementedUnaryCall(methodDescriptor, responseObserver);
423     return new NoopStreamObserver<T>();
424   }
425 
426   /**
427    * No-op implementation of StreamObserver. Used in abstract stubs for default implementations of
428    * methods which throws UNIMPLEMENTED error and tests.
429    */
430   static class NoopStreamObserver<V> implements StreamObserver<V> {
431     @Override
onNext(V value)432     public void onNext(V value) {
433     }
434 
435     @Override
onError(Throwable t)436     public void onError(Throwable t) {
437     }
438 
439     @Override
onCompleted()440     public void onCompleted() {
441     }
442   }
443 }
444