1 /* 2 * Copyright 2014 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.stub; 18 19 import static com.google.common.base.Preconditions.checkNotNull; 20 import static com.google.common.base.Preconditions.checkState; 21 22 import com.google.common.annotations.VisibleForTesting; 23 import com.google.common.base.Preconditions; 24 import io.grpc.Metadata; 25 import io.grpc.MethodDescriptor; 26 import io.grpc.ServerCall; 27 import io.grpc.ServerCallHandler; 28 import io.grpc.Status; 29 30 /** 31 * Utility functions for adapting {@link ServerCallHandler}s to application service implementation, 32 * meant to be used by the generated code. 33 */ 34 public final class ServerCalls { 35 36 @VisibleForTesting 37 static final String TOO_MANY_REQUESTS = "Too many requests"; 38 @VisibleForTesting 39 static final String MISSING_REQUEST = "Half-closed without a request"; 40 ServerCalls()41 private ServerCalls() { 42 } 43 44 /** 45 * Creates a {@link ServerCallHandler} for a unary call method of the service. 46 * 47 * @param method an adaptor to the actual method on the service implementation. 48 */ asyncUnaryCall( UnaryMethod<ReqT, RespT> method)49 public static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncUnaryCall( 50 UnaryMethod<ReqT, RespT> method) { 51 return asyncUnaryRequestCall(method); 52 } 53 54 /** 55 * Creates a {@link ServerCallHandler} for a server streaming method of the service. 56 * 57 * @param method an adaptor to the actual method on the service implementation. 58 */ asyncServerStreamingCall( ServerStreamingMethod<ReqT, RespT> method)59 public static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncServerStreamingCall( 60 ServerStreamingMethod<ReqT, RespT> method) { 61 return asyncUnaryRequestCall(method); 62 } 63 64 /** 65 * Creates a {@link ServerCallHandler} for a client streaming method of the service. 66 * 67 * @param method an adaptor to the actual method on the service implementation. 68 */ asyncClientStreamingCall( ClientStreamingMethod<ReqT, RespT> method)69 public static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncClientStreamingCall( 70 ClientStreamingMethod<ReqT, RespT> method) { 71 return asyncStreamingRequestCall(method); 72 } 73 74 /** 75 * Creates a {@link ServerCallHandler} for a bidi streaming method of the service. 76 * 77 * @param method an adaptor to the actual method on the service implementation. 78 */ asyncBidiStreamingCall( BidiStreamingMethod<ReqT, RespT> method)79 public static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncBidiStreamingCall( 80 BidiStreamingMethod<ReqT, RespT> method) { 81 return asyncStreamingRequestCall(method); 82 } 83 84 /** 85 * Adaptor to a unary call method. 86 */ 87 public interface UnaryMethod<ReqT, RespT> extends UnaryRequestMethod<ReqT, RespT> {} 88 89 /** 90 * Adaptor to a server streaming method. 91 */ 92 public interface ServerStreamingMethod<ReqT, RespT> extends UnaryRequestMethod<ReqT, RespT> {} 93 94 /** 95 * Adaptor to a client streaming method. 96 */ 97 public interface ClientStreamingMethod<ReqT, RespT> extends StreamingRequestMethod<ReqT, RespT> {} 98 99 /** 100 * Adaptor to a bidirectional streaming method. 101 */ 102 public interface BidiStreamingMethod<ReqT, RespT> extends StreamingRequestMethod<ReqT, RespT> {} 103 104 private static final class UnaryServerCallHandler<ReqT, RespT> 105 implements ServerCallHandler<ReqT, RespT> { 106 107 private final UnaryRequestMethod<ReqT, RespT> method; 108 109 // Non private to avoid synthetic class UnaryServerCallHandler(UnaryRequestMethod<ReqT, RespT> method)110 UnaryServerCallHandler(UnaryRequestMethod<ReqT, RespT> method) { 111 this.method = method; 112 } 113 114 @Override startCall(ServerCall<ReqT, RespT> call, Metadata headers)115 public ServerCall.Listener<ReqT> startCall(ServerCall<ReqT, RespT> call, Metadata headers) { 116 Preconditions.checkArgument( 117 call.getMethodDescriptor().getType().clientSendsOneMessage(), 118 "asyncUnaryRequestCall is only for clientSendsOneMessage methods"); 119 ServerCallStreamObserverImpl<ReqT, RespT> responseObserver = 120 new ServerCallStreamObserverImpl<ReqT, RespT>(call); 121 // We expect only 1 request, but we ask for 2 requests here so that if a misbehaving client 122 // sends more than 1 requests, ServerCall will catch it. Note that disabling auto 123 // inbound flow control has no effect on unary calls. 124 call.request(2); 125 return new UnaryServerCallListener(responseObserver, call); 126 } 127 128 private final class UnaryServerCallListener extends ServerCall.Listener<ReqT> { 129 private final ServerCall<ReqT, RespT> call; 130 private final ServerCallStreamObserverImpl<ReqT, RespT> responseObserver; 131 private boolean canInvoke = true; 132 private ReqT request; 133 134 // Non private to avoid synthetic class UnaryServerCallListener( ServerCallStreamObserverImpl<ReqT, RespT> responseObserver, ServerCall<ReqT, RespT> call)135 UnaryServerCallListener( 136 ServerCallStreamObserverImpl<ReqT, RespT> responseObserver, 137 ServerCall<ReqT, RespT> call) { 138 this.call = call; 139 this.responseObserver = responseObserver; 140 } 141 142 @Override onMessage(ReqT request)143 public void onMessage(ReqT request) { 144 if (this.request != null) { 145 // Safe to close the call, because the application has not yet been invoked 146 call.close( 147 Status.INTERNAL.withDescription(TOO_MANY_REQUESTS), 148 new Metadata()); 149 canInvoke = false; 150 return; 151 } 152 153 // We delay calling method.invoke() until onHalfClose() to make sure the client 154 // half-closes. 155 this.request = request; 156 } 157 158 @Override onHalfClose()159 public void onHalfClose() { 160 if (!canInvoke) { 161 return; 162 } 163 if (request == null) { 164 // Safe to close the call, because the application has not yet been invoked 165 call.close( 166 Status.INTERNAL.withDescription(MISSING_REQUEST), 167 new Metadata()); 168 return; 169 } 170 171 method.invoke(request, responseObserver); 172 responseObserver.freeze(); 173 if (call.isReady()) { 174 // Since we are calling invoke in halfClose we have missed the onReady 175 // event from the transport so recover it here. 176 onReady(); 177 } 178 } 179 180 @Override onCancel()181 public void onCancel() { 182 responseObserver.cancelled = true; 183 if (responseObserver.onCancelHandler != null) { 184 responseObserver.onCancelHandler.run(); 185 } 186 } 187 188 @Override onReady()189 public void onReady() { 190 if (responseObserver.onReadyHandler != null) { 191 responseObserver.onReadyHandler.run(); 192 } 193 } 194 } 195 } 196 197 /** 198 * Creates a {@link ServerCallHandler} for a unary request call method of the service. 199 * 200 * @param method an adaptor to the actual method on the service implementation. 201 */ asyncUnaryRequestCall( UnaryRequestMethod<ReqT, RespT> method)202 private static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncUnaryRequestCall( 203 UnaryRequestMethod<ReqT, RespT> method) { 204 return new UnaryServerCallHandler<ReqT, RespT>(method); 205 } 206 207 private static final class StreamingServerCallHandler<ReqT, RespT> 208 implements ServerCallHandler<ReqT, RespT> { 209 210 private final StreamingRequestMethod<ReqT, RespT> method; 211 212 // Non private to avoid synthetic class StreamingServerCallHandler(StreamingRequestMethod<ReqT, RespT> method)213 StreamingServerCallHandler(StreamingRequestMethod<ReqT, RespT> method) { 214 this.method = method; 215 } 216 217 @Override startCall(ServerCall<ReqT, RespT> call, Metadata headers)218 public ServerCall.Listener<ReqT> startCall(ServerCall<ReqT, RespT> call, Metadata headers) { 219 ServerCallStreamObserverImpl<ReqT, RespT> responseObserver = 220 new ServerCallStreamObserverImpl<ReqT, RespT>(call); 221 StreamObserver<ReqT> requestObserver = method.invoke(responseObserver); 222 responseObserver.freeze(); 223 if (responseObserver.autoFlowControlEnabled) { 224 call.request(1); 225 } 226 return new StreamingServerCallListener(requestObserver, responseObserver, call); 227 } 228 229 private final class StreamingServerCallListener extends ServerCall.Listener<ReqT> { 230 231 private final StreamObserver<ReqT> requestObserver; 232 private final ServerCallStreamObserverImpl<ReqT, RespT> responseObserver; 233 private final ServerCall<ReqT, RespT> call; 234 private boolean halfClosed = false; 235 236 // Non private to avoid synthetic class StreamingServerCallListener( StreamObserver<ReqT> requestObserver, ServerCallStreamObserverImpl<ReqT, RespT> responseObserver, ServerCall<ReqT, RespT> call)237 StreamingServerCallListener( 238 StreamObserver<ReqT> requestObserver, 239 ServerCallStreamObserverImpl<ReqT, RespT> responseObserver, 240 ServerCall<ReqT, RespT> call) { 241 this.requestObserver = requestObserver; 242 this.responseObserver = responseObserver; 243 this.call = call; 244 } 245 246 @Override onMessage(ReqT request)247 public void onMessage(ReqT request) { 248 requestObserver.onNext(request); 249 250 // Request delivery of the next inbound message. 251 if (responseObserver.autoFlowControlEnabled) { 252 call.request(1); 253 } 254 } 255 256 @Override onHalfClose()257 public void onHalfClose() { 258 halfClosed = true; 259 requestObserver.onCompleted(); 260 } 261 262 @Override onCancel()263 public void onCancel() { 264 responseObserver.cancelled = true; 265 if (responseObserver.onCancelHandler != null) { 266 responseObserver.onCancelHandler.run(); 267 } 268 if (!halfClosed) { 269 requestObserver.onError( 270 Status.CANCELLED 271 .withDescription("cancelled before receiving half close") 272 .asRuntimeException()); 273 } 274 } 275 276 @Override onReady()277 public void onReady() { 278 if (responseObserver.onReadyHandler != null) { 279 responseObserver.onReadyHandler.run(); 280 } 281 } 282 } 283 } 284 285 /** 286 * Creates a {@link ServerCallHandler} for a streaming request call method of the service. 287 * 288 * @param method an adaptor to the actual method on the service implementation. 289 */ asyncStreamingRequestCall( StreamingRequestMethod<ReqT, RespT> method)290 private static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncStreamingRequestCall( 291 StreamingRequestMethod<ReqT, RespT> method) { 292 return new StreamingServerCallHandler<ReqT, RespT>(method); 293 } 294 295 private interface UnaryRequestMethod<ReqT, RespT> { invoke(ReqT request, StreamObserver<RespT> responseObserver)296 void invoke(ReqT request, StreamObserver<RespT> responseObserver); 297 } 298 299 private interface StreamingRequestMethod<ReqT, RespT> { invoke(StreamObserver<RespT> responseObserver)300 StreamObserver<ReqT> invoke(StreamObserver<RespT> responseObserver); 301 } 302 303 private static final class ServerCallStreamObserverImpl<ReqT, RespT> 304 extends ServerCallStreamObserver<RespT> { 305 final ServerCall<ReqT, RespT> call; 306 volatile boolean cancelled; 307 private boolean frozen; 308 private boolean autoFlowControlEnabled = true; 309 private boolean sentHeaders; 310 private Runnable onReadyHandler; 311 private Runnable onCancelHandler; 312 313 // Non private to avoid synthetic class ServerCallStreamObserverImpl(ServerCall<ReqT, RespT> call)314 ServerCallStreamObserverImpl(ServerCall<ReqT, RespT> call) { 315 this.call = call; 316 } 317 freeze()318 private void freeze() { 319 this.frozen = true; 320 } 321 322 @Override setMessageCompression(boolean enable)323 public void setMessageCompression(boolean enable) { 324 call.setMessageCompression(enable); 325 } 326 327 @Override setCompression(String compression)328 public void setCompression(String compression) { 329 call.setCompression(compression); 330 } 331 332 @Override onNext(RespT response)333 public void onNext(RespT response) { 334 if (cancelled) { 335 throw Status.CANCELLED.withDescription("call already cancelled").asRuntimeException(); 336 } 337 if (!sentHeaders) { 338 call.sendHeaders(new Metadata()); 339 sentHeaders = true; 340 } 341 call.sendMessage(response); 342 } 343 344 @Override onError(Throwable t)345 public void onError(Throwable t) { 346 Metadata metadata = Status.trailersFromThrowable(t); 347 if (metadata == null) { 348 metadata = new Metadata(); 349 } 350 call.close(Status.fromThrowable(t), metadata); 351 } 352 353 @Override onCompleted()354 public void onCompleted() { 355 if (cancelled) { 356 throw Status.CANCELLED.withDescription("call already cancelled").asRuntimeException(); 357 } else { 358 call.close(Status.OK, new Metadata()); 359 } 360 } 361 362 @Override isReady()363 public boolean isReady() { 364 return call.isReady(); 365 } 366 367 @Override setOnReadyHandler(Runnable r)368 public void setOnReadyHandler(Runnable r) { 369 checkState(!frozen, "Cannot alter onReadyHandler after initialization"); 370 this.onReadyHandler = r; 371 } 372 373 @Override isCancelled()374 public boolean isCancelled() { 375 return call.isCancelled(); 376 } 377 378 @Override setOnCancelHandler(Runnable onCancelHandler)379 public void setOnCancelHandler(Runnable onCancelHandler) { 380 checkState(!frozen, "Cannot alter onCancelHandler after initialization"); 381 this.onCancelHandler = onCancelHandler; 382 } 383 384 @Override disableAutoInboundFlowControl()385 public void disableAutoInboundFlowControl() { 386 checkState(!frozen, "Cannot disable auto flow control after initialization"); 387 autoFlowControlEnabled = false; 388 } 389 390 @Override request(int count)391 public void request(int count) { 392 call.request(count); 393 } 394 } 395 396 /** 397 * Sets unimplemented status for method on given response stream for unary call. 398 * 399 * @param methodDescriptor of method for which error will be thrown. 400 * @param responseObserver on which error will be set. 401 */ asyncUnimplementedUnaryCall( MethodDescriptor<?, ?> methodDescriptor, StreamObserver<?> responseObserver)402 public static void asyncUnimplementedUnaryCall( 403 MethodDescriptor<?, ?> methodDescriptor, StreamObserver<?> responseObserver) { 404 checkNotNull(methodDescriptor, "methodDescriptor"); 405 checkNotNull(responseObserver, "responseObserver"); 406 responseObserver.onError(Status.UNIMPLEMENTED 407 .withDescription(String.format("Method %s is unimplemented", 408 methodDescriptor.getFullMethodName())) 409 .asRuntimeException()); 410 } 411 412 /** 413 * Sets unimplemented status for streaming call. 414 * 415 * @param methodDescriptor of method for which error will be thrown. 416 * @param responseObserver on which error will be set. 417 */ asyncUnimplementedStreamingCall( MethodDescriptor<?, ?> methodDescriptor, StreamObserver<?> responseObserver)418 public static <T> StreamObserver<T> asyncUnimplementedStreamingCall( 419 MethodDescriptor<?, ?> methodDescriptor, StreamObserver<?> responseObserver) { 420 // NB: For streaming call we want to do the same as for unary call. Fail-fast by setting error 421 // on responseObserver and then return no-op observer. 422 asyncUnimplementedUnaryCall(methodDescriptor, responseObserver); 423 return new NoopStreamObserver<T>(); 424 } 425 426 /** 427 * No-op implementation of StreamObserver. Used in abstract stubs for default implementations of 428 * methods which throws UNIMPLEMENTED error and tests. 429 */ 430 static class NoopStreamObserver<V> implements StreamObserver<V> { 431 @Override onNext(V value)432 public void onNext(V value) { 433 } 434 435 @Override onError(Throwable t)436 public void onError(Throwable t) { 437 } 438 439 @Override onCompleted()440 public void onCompleted() { 441 } 442 } 443 } 444