1 /*
2  * Copyright 2014 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.stub;
18 
19 import static com.google.common.base.Preconditions.checkNotNull;
20 
21 import com.google.common.base.MoreObjects;
22 import com.google.common.base.Preconditions;
23 import com.google.common.util.concurrent.AbstractFuture;
24 import com.google.common.util.concurrent.ListenableFuture;
25 import io.grpc.CallOptions;
26 import io.grpc.Channel;
27 import io.grpc.ClientCall;
28 import io.grpc.Metadata;
29 import io.grpc.MethodDescriptor;
30 import io.grpc.Status;
31 import io.grpc.StatusException;
32 import io.grpc.StatusRuntimeException;
33 import java.util.Iterator;
34 import java.util.NoSuchElementException;
35 import java.util.concurrent.ArrayBlockingQueue;
36 import java.util.concurrent.BlockingQueue;
37 import java.util.concurrent.ExecutionException;
38 import java.util.concurrent.Executor;
39 import java.util.concurrent.Future;
40 import java.util.concurrent.LinkedBlockingQueue;
41 import java.util.logging.Level;
42 import java.util.logging.Logger;
43 import javax.annotation.Nullable;
44 
45 /**
46  * Utility functions for processing different call idioms. We have one-to-one correspondence
47  * between utilities in this class and the potential signatures in a generated stub class so
48  * that the runtime can vary behavior without requiring regeneration of the stub.
49  */
50 public final class ClientCalls {
51 
52   private static final Logger logger = Logger.getLogger(ClientCalls.class.getName());
53 
54   // Prevent instantiation
ClientCalls()55   private ClientCalls() {}
56 
57   /**
58    * Executes a unary call with a response {@link StreamObserver}.  The {@code call} should not be
59    * already started.  After calling this method, {@code call} should no longer be used.
60    */
asyncUnaryCall( ClientCall<ReqT, RespT> call, ReqT req, StreamObserver<RespT> responseObserver)61   public static <ReqT, RespT> void asyncUnaryCall(
62       ClientCall<ReqT, RespT> call, ReqT req, StreamObserver<RespT> responseObserver) {
63     asyncUnaryRequestCall(call, req, responseObserver, false);
64   }
65 
66   /**
67    * Executes a server-streaming call with a response {@link StreamObserver}.  The {@code call}
68    * should not be already started.  After calling this method, {@code call} should no longer be
69    * used.
70    */
asyncServerStreamingCall( ClientCall<ReqT, RespT> call, ReqT req, StreamObserver<RespT> responseObserver)71   public static <ReqT, RespT> void asyncServerStreamingCall(
72       ClientCall<ReqT, RespT> call, ReqT req, StreamObserver<RespT> responseObserver) {
73     asyncUnaryRequestCall(call, req, responseObserver, true);
74   }
75 
76   /**
77    * Executes a client-streaming call returning a {@link StreamObserver} for the request messages.
78    * The {@code call} should not be already started.  After calling this method, {@code call}
79    * should no longer be used.
80    *
81    * @return request stream observer.
82    */
asyncClientStreamingCall( ClientCall<ReqT, RespT> call, StreamObserver<RespT> responseObserver)83   public static <ReqT, RespT> StreamObserver<ReqT> asyncClientStreamingCall(
84       ClientCall<ReqT, RespT> call,
85       StreamObserver<RespT> responseObserver) {
86     return asyncStreamingRequestCall(call, responseObserver, false);
87   }
88 
89   /**
90    * Executes a bidirectional-streaming call.  The {@code call} should not be already started.
91    * After calling this method, {@code call} should no longer be used.
92    *
93    * @return request stream observer.
94    */
asyncBidiStreamingCall( ClientCall<ReqT, RespT> call, StreamObserver<RespT> responseObserver)95   public static <ReqT, RespT> StreamObserver<ReqT> asyncBidiStreamingCall(
96       ClientCall<ReqT, RespT> call, StreamObserver<RespT> responseObserver) {
97     return asyncStreamingRequestCall(call, responseObserver, true);
98   }
99 
100   /**
101    * Executes a unary call and blocks on the response.  The {@code call} should not be already
102    * started.  After calling this method, {@code call} should no longer be used.
103    *
104    * @return the single response message.
105    */
blockingUnaryCall(ClientCall<ReqT, RespT> call, ReqT req)106   public static <ReqT, RespT> RespT blockingUnaryCall(ClientCall<ReqT, RespT> call, ReqT req) {
107     try {
108       return getUnchecked(futureUnaryCall(call, req));
109     } catch (RuntimeException e) {
110       throw cancelThrow(call, e);
111     } catch (Error e) {
112       throw cancelThrow(call, e);
113     }
114   }
115 
116   /**
117    * Executes a unary call and blocks on the response.  The {@code call} should not be already
118    * started.  After calling this method, {@code call} should no longer be used.
119    *
120    * @return the single response message.
121    */
blockingUnaryCall( Channel channel, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, ReqT req)122   public static <ReqT, RespT> RespT blockingUnaryCall(
123       Channel channel, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, ReqT req) {
124     ThreadlessExecutor executor = new ThreadlessExecutor();
125     ClientCall<ReqT, RespT> call = channel.newCall(method, callOptions.withExecutor(executor));
126     try {
127       ListenableFuture<RespT> responseFuture = futureUnaryCall(call, req);
128       while (!responseFuture.isDone()) {
129         try {
130           executor.waitAndDrain();
131         } catch (InterruptedException e) {
132           Thread.currentThread().interrupt();
133           throw Status.CANCELLED
134               .withDescription("Call was interrupted")
135               .withCause(e)
136               .asRuntimeException();
137         }
138       }
139       return getUnchecked(responseFuture);
140     } catch (RuntimeException e) {
141       throw cancelThrow(call, e);
142     } catch (Error e) {
143       throw cancelThrow(call, e);
144     }
145   }
146 
147   /**
148    * Executes a server-streaming call returning a blocking {@link Iterator} over the
149    * response stream.  The {@code call} should not be already started.  After calling this method,
150    * {@code call} should no longer be used.
151    *
152    * @return an iterator over the response stream.
153    */
154   // TODO(louiscryan): Not clear if we want to use this idiom for 'simple' stubs.
blockingServerStreamingCall( ClientCall<ReqT, RespT> call, ReqT req)155   public static <ReqT, RespT> Iterator<RespT> blockingServerStreamingCall(
156       ClientCall<ReqT, RespT> call, ReqT req) {
157     BlockingResponseStream<RespT> result = new BlockingResponseStream<RespT>(call);
158     asyncUnaryRequestCall(call, req, result.listener(), true);
159     return result;
160   }
161 
162   /**
163    * Executes a server-streaming call returning a blocking {@link Iterator} over the
164    * response stream.  The {@code call} should not be already started.  After calling this method,
165    * {@code call} should no longer be used.
166    *
167    * @return an iterator over the response stream.
168    */
169   // TODO(louiscryan): Not clear if we want to use this idiom for 'simple' stubs.
blockingServerStreamingCall( Channel channel, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, ReqT req)170   public static <ReqT, RespT> Iterator<RespT> blockingServerStreamingCall(
171       Channel channel, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, ReqT req) {
172     ThreadlessExecutor executor = new ThreadlessExecutor();
173     ClientCall<ReqT, RespT> call = channel.newCall(method, callOptions.withExecutor(executor));
174     BlockingResponseStream<RespT> result = new BlockingResponseStream<RespT>(call, executor);
175     asyncUnaryRequestCall(call, req, result.listener(), true);
176     return result;
177   }
178 
179   /**
180    * Executes a unary call and returns a {@link ListenableFuture} to the response.  The
181    * {@code call} should not be already started.  After calling this method, {@code call} should no
182    * longer be used.
183    *
184    * @return a future for the single response message.
185    */
futureUnaryCall( ClientCall<ReqT, RespT> call, ReqT req)186   public static <ReqT, RespT> ListenableFuture<RespT> futureUnaryCall(
187       ClientCall<ReqT, RespT> call, ReqT req) {
188     GrpcFuture<RespT> responseFuture = new GrpcFuture<RespT>(call);
189     asyncUnaryRequestCall(call, req, new UnaryStreamToFuture<RespT>(responseFuture), false);
190     return responseFuture;
191   }
192 
193   /**
194    * Returns the result of calling {@link Future#get()} interruptibly on a task known not to throw a
195    * checked exception.
196    *
197    * <p>If interrupted, the interrupt is restored before throwing an exception..
198    *
199    * @throws java.util.concurrent.CancellationException
200    *     if {@code get} throws a {@code CancellationException}.
201    * @throws io.grpc.StatusRuntimeException if {@code get} throws an {@link ExecutionException}
202    *     or an {@link InterruptedException}.
203    */
getUnchecked(Future<V> future)204   private static <V> V getUnchecked(Future<V> future) {
205     try {
206       return future.get();
207     } catch (InterruptedException e) {
208       Thread.currentThread().interrupt();
209       throw Status.CANCELLED
210           .withDescription("Call was interrupted")
211           .withCause(e)
212           .asRuntimeException();
213     } catch (ExecutionException e) {
214       throw toStatusRuntimeException(e.getCause());
215     }
216   }
217 
218   /**
219    * Wraps the given {@link Throwable} in a {@link StatusRuntimeException}. If it contains an
220    * embedded {@link StatusException} or {@link StatusRuntimeException}, the returned exception will
221    * contain the embedded trailers and status, with the given exception as the cause. Otherwise, an
222    * exception will be generated from an {@link Status#UNKNOWN} status.
223    */
toStatusRuntimeException(Throwable t)224   private static StatusRuntimeException toStatusRuntimeException(Throwable t) {
225     Throwable cause = checkNotNull(t, "t");
226     while (cause != null) {
227       // If we have an embedded status, use it and replace the cause
228       if (cause instanceof StatusException) {
229         StatusException se = (StatusException) cause;
230         return new StatusRuntimeException(se.getStatus(), se.getTrailers());
231       } else if (cause instanceof StatusRuntimeException) {
232         StatusRuntimeException se = (StatusRuntimeException) cause;
233         return new StatusRuntimeException(se.getStatus(), se.getTrailers());
234       }
235       cause = cause.getCause();
236     }
237     return Status.UNKNOWN.withDescription("unexpected exception").withCause(t)
238         .asRuntimeException();
239   }
240 
241   /**
242    * Cancels a call, and throws the exception.
243    *
244    * @param t must be a RuntimeException or Error
245    */
cancelThrow(ClientCall<?, ?> call, Throwable t)246   private static RuntimeException cancelThrow(ClientCall<?, ?> call, Throwable t) {
247     try {
248       call.cancel(null, t);
249     } catch (Throwable e) {
250       assert e instanceof RuntimeException || e instanceof Error;
251       logger.log(Level.SEVERE, "RuntimeException encountered while closing call", e);
252     }
253     if (t instanceof RuntimeException) {
254       throw (RuntimeException) t;
255     } else if (t instanceof Error) {
256       throw (Error) t;
257     }
258     // should be impossible
259     throw new AssertionError(t);
260   }
261 
asyncUnaryRequestCall( ClientCall<ReqT, RespT> call, ReqT req, StreamObserver<RespT> responseObserver, boolean streamingResponse)262   private static <ReqT, RespT> void asyncUnaryRequestCall(
263       ClientCall<ReqT, RespT> call, ReqT req, StreamObserver<RespT> responseObserver,
264       boolean streamingResponse) {
265     asyncUnaryRequestCall(
266         call,
267         req,
268         new StreamObserverToCallListenerAdapter<ReqT, RespT>(
269             responseObserver,
270             new CallToStreamObserverAdapter<ReqT>(call),
271             streamingResponse),
272         streamingResponse);
273   }
274 
asyncUnaryRequestCall( ClientCall<ReqT, RespT> call, ReqT req, ClientCall.Listener<RespT> responseListener, boolean streamingResponse)275   private static <ReqT, RespT> void asyncUnaryRequestCall(
276       ClientCall<ReqT, RespT> call,
277       ReqT req,
278       ClientCall.Listener<RespT> responseListener,
279       boolean streamingResponse) {
280     startCall(call, responseListener, streamingResponse);
281     try {
282       call.sendMessage(req);
283       call.halfClose();
284     } catch (RuntimeException e) {
285       throw cancelThrow(call, e);
286     } catch (Error e) {
287       throw cancelThrow(call, e);
288     }
289   }
290 
asyncStreamingRequestCall( ClientCall<ReqT, RespT> call, StreamObserver<RespT> responseObserver, boolean streamingResponse)291   private static <ReqT, RespT> StreamObserver<ReqT> asyncStreamingRequestCall(
292       ClientCall<ReqT, RespT> call,
293       StreamObserver<RespT> responseObserver,
294       boolean streamingResponse) {
295     CallToStreamObserverAdapter<ReqT> adapter = new CallToStreamObserverAdapter<ReqT>(call);
296     startCall(
297         call,
298         new StreamObserverToCallListenerAdapter<ReqT, RespT>(
299             responseObserver, adapter, streamingResponse),
300         streamingResponse);
301     return adapter;
302   }
303 
startCall( ClientCall<ReqT, RespT> call, ClientCall.Listener<RespT> responseListener, boolean streamingResponse)304   private static <ReqT, RespT> void startCall(
305       ClientCall<ReqT, RespT> call,
306       ClientCall.Listener<RespT> responseListener,
307       boolean streamingResponse) {
308     call.start(responseListener, new Metadata());
309     if (streamingResponse) {
310       call.request(1);
311     } else {
312       // Initially ask for two responses from flow-control so that if a misbehaving server sends
313       // more than one responses, we can catch it and fail it in the listener.
314       call.request(2);
315     }
316   }
317 
318   private static final class CallToStreamObserverAdapter<T> extends ClientCallStreamObserver<T> {
319     private boolean frozen;
320     private final ClientCall<T, ?> call;
321     private Runnable onReadyHandler;
322     private boolean autoFlowControlEnabled = true;
323 
324     // Non private to avoid synthetic class
CallToStreamObserverAdapter(ClientCall<T, ?> call)325     CallToStreamObserverAdapter(ClientCall<T, ?> call) {
326       this.call = call;
327     }
328 
freeze()329     private void freeze() {
330       this.frozen = true;
331     }
332 
333     @Override
onNext(T value)334     public void onNext(T value) {
335       call.sendMessage(value);
336     }
337 
338     @Override
onError(Throwable t)339     public void onError(Throwable t) {
340       call.cancel("Cancelled by client with StreamObserver.onError()", t);
341     }
342 
343     @Override
onCompleted()344     public void onCompleted() {
345       call.halfClose();
346     }
347 
348     @Override
isReady()349     public boolean isReady() {
350       return call.isReady();
351     }
352 
353     @Override
setOnReadyHandler(Runnable onReadyHandler)354     public void setOnReadyHandler(Runnable onReadyHandler) {
355       if (frozen) {
356         throw new IllegalStateException("Cannot alter onReadyHandler after call started");
357       }
358       this.onReadyHandler = onReadyHandler;
359     }
360 
361     @Override
disableAutoInboundFlowControl()362     public void disableAutoInboundFlowControl() {
363       if (frozen) {
364         throw new IllegalStateException("Cannot disable auto flow control call started");
365       }
366       autoFlowControlEnabled = false;
367     }
368 
369     @Override
request(int count)370     public void request(int count) {
371       call.request(count);
372     }
373 
374     @Override
setMessageCompression(boolean enable)375     public void setMessageCompression(boolean enable) {
376       call.setMessageCompression(enable);
377     }
378 
379     @Override
cancel(@ullable String message, @Nullable Throwable cause)380     public void cancel(@Nullable String message, @Nullable Throwable cause) {
381       call.cancel(message, cause);
382     }
383   }
384 
385   private static final class StreamObserverToCallListenerAdapter<ReqT, RespT>
386       extends ClientCall.Listener<RespT> {
387     private final StreamObserver<RespT> observer;
388     private final CallToStreamObserverAdapter<ReqT> adapter;
389     private final boolean streamingResponse;
390     private boolean firstResponseReceived;
391 
392     // Non private to avoid synthetic class
StreamObserverToCallListenerAdapter( StreamObserver<RespT> observer, CallToStreamObserverAdapter<ReqT> adapter, boolean streamingResponse)393     StreamObserverToCallListenerAdapter(
394         StreamObserver<RespT> observer,
395         CallToStreamObserverAdapter<ReqT> adapter,
396         boolean streamingResponse) {
397       this.observer = observer;
398       this.streamingResponse = streamingResponse;
399       this.adapter = adapter;
400       if (observer instanceof ClientResponseObserver) {
401         @SuppressWarnings("unchecked")
402         ClientResponseObserver<ReqT, RespT> clientResponseObserver =
403             (ClientResponseObserver<ReqT, RespT>) observer;
404         clientResponseObserver.beforeStart(adapter);
405       }
406       adapter.freeze();
407     }
408 
409     @Override
onHeaders(Metadata headers)410     public void onHeaders(Metadata headers) {
411     }
412 
413     @Override
onMessage(RespT message)414     public void onMessage(RespT message) {
415       if (firstResponseReceived && !streamingResponse) {
416         throw Status.INTERNAL
417             .withDescription("More than one responses received for unary or client-streaming call")
418             .asRuntimeException();
419       }
420       firstResponseReceived = true;
421       observer.onNext(message);
422 
423       if (streamingResponse && adapter.autoFlowControlEnabled) {
424         // Request delivery of the next inbound message.
425         adapter.request(1);
426       }
427     }
428 
429     @Override
onClose(Status status, Metadata trailers)430     public void onClose(Status status, Metadata trailers) {
431       if (status.isOk()) {
432         observer.onCompleted();
433       } else {
434         observer.onError(status.asRuntimeException(trailers));
435       }
436     }
437 
438     @Override
onReady()439     public void onReady() {
440       if (adapter.onReadyHandler != null) {
441         adapter.onReadyHandler.run();
442       }
443     }
444   }
445 
446   /**
447    * Completes a {@link GrpcFuture} using {@link StreamObserver} events.
448    */
449   private static final class UnaryStreamToFuture<RespT> extends ClientCall.Listener<RespT> {
450     private final GrpcFuture<RespT> responseFuture;
451     private RespT value;
452 
453     // Non private to avoid synthetic class
UnaryStreamToFuture(GrpcFuture<RespT> responseFuture)454     UnaryStreamToFuture(GrpcFuture<RespT> responseFuture) {
455       this.responseFuture = responseFuture;
456     }
457 
458     @Override
onHeaders(Metadata headers)459     public void onHeaders(Metadata headers) {
460     }
461 
462     @Override
onMessage(RespT value)463     public void onMessage(RespT value) {
464       if (this.value != null) {
465         throw Status.INTERNAL.withDescription("More than one value received for unary call")
466             .asRuntimeException();
467       }
468       this.value = value;
469     }
470 
471     @Override
onClose(Status status, Metadata trailers)472     public void onClose(Status status, Metadata trailers) {
473       if (status.isOk()) {
474         if (value == null) {
475           // No value received so mark the future as an error
476           responseFuture.setException(
477               Status.INTERNAL.withDescription("No value received for unary call")
478                   .asRuntimeException(trailers));
479         }
480         responseFuture.set(value);
481       } else {
482         responseFuture.setException(status.asRuntimeException(trailers));
483       }
484     }
485   }
486 
487   private static final class GrpcFuture<RespT> extends AbstractFuture<RespT> {
488     private final ClientCall<?, RespT> call;
489 
490     // Non private to avoid synthetic class
GrpcFuture(ClientCall<?, RespT> call)491     GrpcFuture(ClientCall<?, RespT> call) {
492       this.call = call;
493     }
494 
495     @Override
interruptTask()496     protected void interruptTask() {
497       call.cancel("GrpcFuture was cancelled", null);
498     }
499 
500     @Override
set(@ullable RespT resp)501     protected boolean set(@Nullable RespT resp) {
502       return super.set(resp);
503     }
504 
505     @Override
setException(Throwable throwable)506     protected boolean setException(Throwable throwable) {
507       return super.setException(throwable);
508     }
509 
510     @SuppressWarnings("MissingOverride") // Add @Override once Java 6 support is dropped
pendingToString()511     protected String pendingToString() {
512       return MoreObjects.toStringHelper(this).add("clientCall", call).toString();
513     }
514   }
515 
516   /**
517    * Convert events on a {@link io.grpc.ClientCall.Listener} into a blocking {@link Iterator}.
518    *
519    * <p>The class is not thread-safe, but it does permit {@link ClientCall.Listener} calls in a
520    * separate thread from {@link Iterator} calls.
521    */
522   // TODO(ejona86): determine how to allow ClientCall.cancel() in case of application error.
523   private static final class BlockingResponseStream<T> implements Iterator<T> {
524     // Due to flow control, only needs to hold up to 2 items: 1 for value, 1 for close.
525     private final BlockingQueue<Object> buffer = new ArrayBlockingQueue<Object>(2);
526     private final ClientCall.Listener<T> listener = new QueuingListener();
527     private final ClientCall<?, T> call;
528     /** May be null. */
529     private final ThreadlessExecutor threadless;
530     // Only accessed when iterating.
531     private Object last;
532 
533     // Non private to avoid synthetic class
BlockingResponseStream(ClientCall<?, T> call)534     BlockingResponseStream(ClientCall<?, T> call) {
535       this(call, null);
536     }
537 
538     // Non private to avoid synthetic class
BlockingResponseStream(ClientCall<?, T> call, ThreadlessExecutor threadless)539     BlockingResponseStream(ClientCall<?, T> call, ThreadlessExecutor threadless) {
540       this.call = call;
541       this.threadless = threadless;
542     }
543 
listener()544     ClientCall.Listener<T> listener() {
545       return listener;
546     }
547 
waitForNext()548     private Object waitForNext() throws InterruptedException {
549       if (threadless == null) {
550         return buffer.take();
551       } else {
552         Object next = buffer.poll();
553         while (next == null) {
554           threadless.waitAndDrain();
555           next = buffer.poll();
556         }
557         return next;
558       }
559     }
560 
561     @Override
hasNext()562     public boolean hasNext() {
563       if (last == null) {
564         try {
565           // Will block here indefinitely waiting for content. RPC timeouts defend against permanent
566           // hangs here as the call will become closed.
567           last = waitForNext();
568         } catch (InterruptedException ie) {
569           Thread.currentThread().interrupt();
570           throw Status.CANCELLED.withDescription("interrupted").withCause(ie).asRuntimeException();
571         }
572       }
573       if (last instanceof StatusRuntimeException) {
574         // Rethrow the exception with a new stacktrace.
575         StatusRuntimeException e = (StatusRuntimeException) last;
576         throw e.getStatus().asRuntimeException(e.getTrailers());
577       }
578       return last != this;
579     }
580 
581     @Override
next()582     public T next() {
583       if (!hasNext()) {
584         throw new NoSuchElementException();
585       }
586       try {
587         call.request(1);
588         @SuppressWarnings("unchecked")
589         T tmp = (T) last;
590         return tmp;
591       } finally {
592         last = null;
593       }
594     }
595 
596     @Override
remove()597     public void remove() {
598       throw new UnsupportedOperationException();
599     }
600 
601     private final class QueuingListener extends ClientCall.Listener<T> {
602       // Non private to avoid synthetic class
QueuingListener()603       QueuingListener() {}
604 
605       private boolean done = false;
606 
607       @Override
onHeaders(Metadata headers)608       public void onHeaders(Metadata headers) {
609       }
610 
611       @Override
onMessage(T value)612       public void onMessage(T value) {
613         Preconditions.checkState(!done, "ClientCall already closed");
614         buffer.add(value);
615       }
616 
617       @Override
onClose(Status status, Metadata trailers)618       public void onClose(Status status, Metadata trailers) {
619         Preconditions.checkState(!done, "ClientCall already closed");
620         if (status.isOk()) {
621           buffer.add(BlockingResponseStream.this);
622         } else {
623           buffer.add(status.asRuntimeException(trailers));
624         }
625         done = true;
626       }
627     }
628   }
629 
630   private static final class ThreadlessExecutor implements Executor {
631     private static final Logger log = Logger.getLogger(ThreadlessExecutor.class.getName());
632 
633     private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
634 
635     // Non private to avoid synthetic class
ThreadlessExecutor()636     ThreadlessExecutor() {}
637 
638     /**
639      * Waits until there is a Runnable, then executes it and all queued Runnables after it.
640      */
waitAndDrain()641     public void waitAndDrain() throws InterruptedException {
642       Runnable runnable = queue.take();
643       while (runnable != null) {
644         try {
645           runnable.run();
646         } catch (Throwable t) {
647           log.log(Level.WARNING, "Runnable threw exception", t);
648         }
649         runnable = queue.poll();
650       }
651     }
652 
653     @Override
execute(Runnable runnable)654     public void execute(Runnable runnable) {
655       queue.add(runnable);
656     }
657   }
658 }
659