/* * Copyright 2014 The gRPC Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package io.grpc.stub; import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.AbstractFuture; import com.google.common.util.concurrent.ListenableFuture; import io.grpc.CallOptions; import io.grpc.Channel; import io.grpc.ClientCall; import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.Status; import io.grpc.StatusException; import io.grpc.StatusRuntimeException; import java.util.Iterator; import java.util.NoSuchElementException; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; /** * Utility functions for processing different call idioms. We have one-to-one correspondence * between utilities in this class and the potential signatures in a generated stub class so * that the runtime can vary behavior without requiring regeneration of the stub. */ public final class ClientCalls { private static final Logger logger = Logger.getLogger(ClientCalls.class.getName()); // Prevent instantiation private ClientCalls() {} /** * Executes a unary call with a response {@link StreamObserver}. The {@code call} should not be * already started. After calling this method, {@code call} should no longer be used. */ public static void asyncUnaryCall( ClientCall call, ReqT req, StreamObserver responseObserver) { asyncUnaryRequestCall(call, req, responseObserver, false); } /** * Executes a server-streaming call with a response {@link StreamObserver}. The {@code call} * should not be already started. After calling this method, {@code call} should no longer be * used. */ public static void asyncServerStreamingCall( ClientCall call, ReqT req, StreamObserver responseObserver) { asyncUnaryRequestCall(call, req, responseObserver, true); } /** * Executes a client-streaming call returning a {@link StreamObserver} for the request messages. * The {@code call} should not be already started. After calling this method, {@code call} * should no longer be used. * * @return request stream observer. */ public static StreamObserver asyncClientStreamingCall( ClientCall call, StreamObserver responseObserver) { return asyncStreamingRequestCall(call, responseObserver, false); } /** * Executes a bidirectional-streaming call. The {@code call} should not be already started. * After calling this method, {@code call} should no longer be used. * * @return request stream observer. */ public static StreamObserver asyncBidiStreamingCall( ClientCall call, StreamObserver responseObserver) { return asyncStreamingRequestCall(call, responseObserver, true); } /** * Executes a unary call and blocks on the response. The {@code call} should not be already * started. After calling this method, {@code call} should no longer be used. * * @return the single response message. */ public static RespT blockingUnaryCall(ClientCall call, ReqT req) { try { return getUnchecked(futureUnaryCall(call, req)); } catch (RuntimeException e) { throw cancelThrow(call, e); } catch (Error e) { throw cancelThrow(call, e); } } /** * Executes a unary call and blocks on the response. The {@code call} should not be already * started. After calling this method, {@code call} should no longer be used. * * @return the single response message. */ public static RespT blockingUnaryCall( Channel channel, MethodDescriptor method, CallOptions callOptions, ReqT req) { ThreadlessExecutor executor = new ThreadlessExecutor(); ClientCall call = channel.newCall(method, callOptions.withExecutor(executor)); try { ListenableFuture responseFuture = futureUnaryCall(call, req); while (!responseFuture.isDone()) { try { executor.waitAndDrain(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw Status.CANCELLED .withDescription("Call was interrupted") .withCause(e) .asRuntimeException(); } } return getUnchecked(responseFuture); } catch (RuntimeException e) { throw cancelThrow(call, e); } catch (Error e) { throw cancelThrow(call, e); } } /** * Executes a server-streaming call returning a blocking {@link Iterator} over the * response stream. The {@code call} should not be already started. After calling this method, * {@code call} should no longer be used. * * @return an iterator over the response stream. */ // TODO(louiscryan): Not clear if we want to use this idiom for 'simple' stubs. public static Iterator blockingServerStreamingCall( ClientCall call, ReqT req) { BlockingResponseStream result = new BlockingResponseStream(call); asyncUnaryRequestCall(call, req, result.listener(), true); return result; } /** * Executes a server-streaming call returning a blocking {@link Iterator} over the * response stream. The {@code call} should not be already started. After calling this method, * {@code call} should no longer be used. * * @return an iterator over the response stream. */ // TODO(louiscryan): Not clear if we want to use this idiom for 'simple' stubs. public static Iterator blockingServerStreamingCall( Channel channel, MethodDescriptor method, CallOptions callOptions, ReqT req) { ThreadlessExecutor executor = new ThreadlessExecutor(); ClientCall call = channel.newCall(method, callOptions.withExecutor(executor)); BlockingResponseStream result = new BlockingResponseStream(call, executor); asyncUnaryRequestCall(call, req, result.listener(), true); return result; } /** * Executes a unary call and returns a {@link ListenableFuture} to the response. The * {@code call} should not be already started. After calling this method, {@code call} should no * longer be used. * * @return a future for the single response message. */ public static ListenableFuture futureUnaryCall( ClientCall call, ReqT req) { GrpcFuture responseFuture = new GrpcFuture(call); asyncUnaryRequestCall(call, req, new UnaryStreamToFuture(responseFuture), false); return responseFuture; } /** * Returns the result of calling {@link Future#get()} interruptibly on a task known not to throw a * checked exception. * *

If interrupted, the interrupt is restored before throwing an exception.. * * @throws java.util.concurrent.CancellationException * if {@code get} throws a {@code CancellationException}. * @throws io.grpc.StatusRuntimeException if {@code get} throws an {@link ExecutionException} * or an {@link InterruptedException}. */ private static V getUnchecked(Future future) { try { return future.get(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw Status.CANCELLED .withDescription("Call was interrupted") .withCause(e) .asRuntimeException(); } catch (ExecutionException e) { throw toStatusRuntimeException(e.getCause()); } } /** * Wraps the given {@link Throwable} in a {@link StatusRuntimeException}. If it contains an * embedded {@link StatusException} or {@link StatusRuntimeException}, the returned exception will * contain the embedded trailers and status, with the given exception as the cause. Otherwise, an * exception will be generated from an {@link Status#UNKNOWN} status. */ private static StatusRuntimeException toStatusRuntimeException(Throwable t) { Throwable cause = checkNotNull(t, "t"); while (cause != null) { // If we have an embedded status, use it and replace the cause if (cause instanceof StatusException) { StatusException se = (StatusException) cause; return new StatusRuntimeException(se.getStatus(), se.getTrailers()); } else if (cause instanceof StatusRuntimeException) { StatusRuntimeException se = (StatusRuntimeException) cause; return new StatusRuntimeException(se.getStatus(), se.getTrailers()); } cause = cause.getCause(); } return Status.UNKNOWN.withDescription("unexpected exception").withCause(t) .asRuntimeException(); } /** * Cancels a call, and throws the exception. * * @param t must be a RuntimeException or Error */ private static RuntimeException cancelThrow(ClientCall call, Throwable t) { try { call.cancel(null, t); } catch (Throwable e) { assert e instanceof RuntimeException || e instanceof Error; logger.log(Level.SEVERE, "RuntimeException encountered while closing call", e); } if (t instanceof RuntimeException) { throw (RuntimeException) t; } else if (t instanceof Error) { throw (Error) t; } // should be impossible throw new AssertionError(t); } private static void asyncUnaryRequestCall( ClientCall call, ReqT req, StreamObserver responseObserver, boolean streamingResponse) { asyncUnaryRequestCall( call, req, new StreamObserverToCallListenerAdapter( responseObserver, new CallToStreamObserverAdapter(call), streamingResponse), streamingResponse); } private static void asyncUnaryRequestCall( ClientCall call, ReqT req, ClientCall.Listener responseListener, boolean streamingResponse) { startCall(call, responseListener, streamingResponse); try { call.sendMessage(req); call.halfClose(); } catch (RuntimeException e) { throw cancelThrow(call, e); } catch (Error e) { throw cancelThrow(call, e); } } private static StreamObserver asyncStreamingRequestCall( ClientCall call, StreamObserver responseObserver, boolean streamingResponse) { CallToStreamObserverAdapter adapter = new CallToStreamObserverAdapter(call); startCall( call, new StreamObserverToCallListenerAdapter( responseObserver, adapter, streamingResponse), streamingResponse); return adapter; } private static void startCall( ClientCall call, ClientCall.Listener responseListener, boolean streamingResponse) { call.start(responseListener, new Metadata()); if (streamingResponse) { call.request(1); } else { // Initially ask for two responses from flow-control so that if a misbehaving server sends // more than one responses, we can catch it and fail it in the listener. call.request(2); } } private static final class CallToStreamObserverAdapter extends ClientCallStreamObserver { private boolean frozen; private final ClientCall call; private Runnable onReadyHandler; private boolean autoFlowControlEnabled = true; // Non private to avoid synthetic class CallToStreamObserverAdapter(ClientCall call) { this.call = call; } private void freeze() { this.frozen = true; } @Override public void onNext(T value) { call.sendMessage(value); } @Override public void onError(Throwable t) { call.cancel("Cancelled by client with StreamObserver.onError()", t); } @Override public void onCompleted() { call.halfClose(); } @Override public boolean isReady() { return call.isReady(); } @Override public void setOnReadyHandler(Runnable onReadyHandler) { if (frozen) { throw new IllegalStateException("Cannot alter onReadyHandler after call started"); } this.onReadyHandler = onReadyHandler; } @Override public void disableAutoInboundFlowControl() { if (frozen) { throw new IllegalStateException("Cannot disable auto flow control call started"); } autoFlowControlEnabled = false; } @Override public void request(int count) { call.request(count); } @Override public void setMessageCompression(boolean enable) { call.setMessageCompression(enable); } @Override public void cancel(@Nullable String message, @Nullable Throwable cause) { call.cancel(message, cause); } } private static final class StreamObserverToCallListenerAdapter extends ClientCall.Listener { private final StreamObserver observer; private final CallToStreamObserverAdapter adapter; private final boolean streamingResponse; private boolean firstResponseReceived; // Non private to avoid synthetic class StreamObserverToCallListenerAdapter( StreamObserver observer, CallToStreamObserverAdapter adapter, boolean streamingResponse) { this.observer = observer; this.streamingResponse = streamingResponse; this.adapter = adapter; if (observer instanceof ClientResponseObserver) { @SuppressWarnings("unchecked") ClientResponseObserver clientResponseObserver = (ClientResponseObserver) observer; clientResponseObserver.beforeStart(adapter); } adapter.freeze(); } @Override public void onHeaders(Metadata headers) { } @Override public void onMessage(RespT message) { if (firstResponseReceived && !streamingResponse) { throw Status.INTERNAL .withDescription("More than one responses received for unary or client-streaming call") .asRuntimeException(); } firstResponseReceived = true; observer.onNext(message); if (streamingResponse && adapter.autoFlowControlEnabled) { // Request delivery of the next inbound message. adapter.request(1); } } @Override public void onClose(Status status, Metadata trailers) { if (status.isOk()) { observer.onCompleted(); } else { observer.onError(status.asRuntimeException(trailers)); } } @Override public void onReady() { if (adapter.onReadyHandler != null) { adapter.onReadyHandler.run(); } } } /** * Completes a {@link GrpcFuture} using {@link StreamObserver} events. */ private static final class UnaryStreamToFuture extends ClientCall.Listener { private final GrpcFuture responseFuture; private RespT value; // Non private to avoid synthetic class UnaryStreamToFuture(GrpcFuture responseFuture) { this.responseFuture = responseFuture; } @Override public void onHeaders(Metadata headers) { } @Override public void onMessage(RespT value) { if (this.value != null) { throw Status.INTERNAL.withDescription("More than one value received for unary call") .asRuntimeException(); } this.value = value; } @Override public void onClose(Status status, Metadata trailers) { if (status.isOk()) { if (value == null) { // No value received so mark the future as an error responseFuture.setException( Status.INTERNAL.withDescription("No value received for unary call") .asRuntimeException(trailers)); } responseFuture.set(value); } else { responseFuture.setException(status.asRuntimeException(trailers)); } } } private static final class GrpcFuture extends AbstractFuture { private final ClientCall call; // Non private to avoid synthetic class GrpcFuture(ClientCall call) { this.call = call; } @Override protected void interruptTask() { call.cancel("GrpcFuture was cancelled", null); } @Override protected boolean set(@Nullable RespT resp) { return super.set(resp); } @Override protected boolean setException(Throwable throwable) { return super.setException(throwable); } @SuppressWarnings("MissingOverride") // Add @Override once Java 6 support is dropped protected String pendingToString() { return MoreObjects.toStringHelper(this).add("clientCall", call).toString(); } } /** * Convert events on a {@link io.grpc.ClientCall.Listener} into a blocking {@link Iterator}. * *

The class is not thread-safe, but it does permit {@link ClientCall.Listener} calls in a * separate thread from {@link Iterator} calls. */ // TODO(ejona86): determine how to allow ClientCall.cancel() in case of application error. private static final class BlockingResponseStream implements Iterator { // Due to flow control, only needs to hold up to 2 items: 1 for value, 1 for close. private final BlockingQueue buffer = new ArrayBlockingQueue(2); private final ClientCall.Listener listener = new QueuingListener(); private final ClientCall call; /** May be null. */ private final ThreadlessExecutor threadless; // Only accessed when iterating. private Object last; // Non private to avoid synthetic class BlockingResponseStream(ClientCall call) { this(call, null); } // Non private to avoid synthetic class BlockingResponseStream(ClientCall call, ThreadlessExecutor threadless) { this.call = call; this.threadless = threadless; } ClientCall.Listener listener() { return listener; } private Object waitForNext() throws InterruptedException { if (threadless == null) { return buffer.take(); } else { Object next = buffer.poll(); while (next == null) { threadless.waitAndDrain(); next = buffer.poll(); } return next; } } @Override public boolean hasNext() { if (last == null) { try { // Will block here indefinitely waiting for content. RPC timeouts defend against permanent // hangs here as the call will become closed. last = waitForNext(); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw Status.CANCELLED.withDescription("interrupted").withCause(ie).asRuntimeException(); } } if (last instanceof StatusRuntimeException) { // Rethrow the exception with a new stacktrace. StatusRuntimeException e = (StatusRuntimeException) last; throw e.getStatus().asRuntimeException(e.getTrailers()); } return last != this; } @Override public T next() { if (!hasNext()) { throw new NoSuchElementException(); } try { call.request(1); @SuppressWarnings("unchecked") T tmp = (T) last; return tmp; } finally { last = null; } } @Override public void remove() { throw new UnsupportedOperationException(); } private final class QueuingListener extends ClientCall.Listener { // Non private to avoid synthetic class QueuingListener() {} private boolean done = false; @Override public void onHeaders(Metadata headers) { } @Override public void onMessage(T value) { Preconditions.checkState(!done, "ClientCall already closed"); buffer.add(value); } @Override public void onClose(Status status, Metadata trailers) { Preconditions.checkState(!done, "ClientCall already closed"); if (status.isOk()) { buffer.add(BlockingResponseStream.this); } else { buffer.add(status.asRuntimeException(trailers)); } done = true; } } } private static final class ThreadlessExecutor implements Executor { private static final Logger log = Logger.getLogger(ThreadlessExecutor.class.getName()); private final BlockingQueue queue = new LinkedBlockingQueue(); // Non private to avoid synthetic class ThreadlessExecutor() {} /** * Waits until there is a Runnable, then executes it and all queued Runnables after it. */ public void waitAndDrain() throws InterruptedException { Runnable runnable = queue.take(); while (runnable != null) { try { runnable.run(); } catch (Throwable t) { log.log(Level.WARNING, "Runnable threw exception", t); } runnable = queue.poll(); } } @Override public void execute(Runnable runnable) { queue.add(runnable); } } }