1 /* 2 * Copyright 2014 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.stub; 18 19 import static com.google.common.base.Preconditions.checkNotNull; 20 21 import com.google.common.base.MoreObjects; 22 import com.google.common.base.Preconditions; 23 import com.google.common.util.concurrent.AbstractFuture; 24 import com.google.common.util.concurrent.ListenableFuture; 25 import io.grpc.CallOptions; 26 import io.grpc.Channel; 27 import io.grpc.ClientCall; 28 import io.grpc.Metadata; 29 import io.grpc.MethodDescriptor; 30 import io.grpc.Status; 31 import io.grpc.StatusException; 32 import io.grpc.StatusRuntimeException; 33 import java.util.Iterator; 34 import java.util.NoSuchElementException; 35 import java.util.concurrent.ArrayBlockingQueue; 36 import java.util.concurrent.BlockingQueue; 37 import java.util.concurrent.ExecutionException; 38 import java.util.concurrent.Executor; 39 import java.util.concurrent.Future; 40 import java.util.concurrent.LinkedBlockingQueue; 41 import java.util.logging.Level; 42 import java.util.logging.Logger; 43 import javax.annotation.Nullable; 44 45 /** 46 * Utility functions for processing different call idioms. We have one-to-one correspondence 47 * between utilities in this class and the potential signatures in a generated stub class so 48 * that the runtime can vary behavior without requiring regeneration of the stub. 49 */ 50 public final class ClientCalls { 51 52 private static final Logger logger = Logger.getLogger(ClientCalls.class.getName()); 53 54 // Prevent instantiation ClientCalls()55 private ClientCalls() {} 56 57 /** 58 * Executes a unary call with a response {@link StreamObserver}. The {@code call} should not be 59 * already started. After calling this method, {@code call} should no longer be used. 60 */ asyncUnaryCall( ClientCall<ReqT, RespT> call, ReqT req, StreamObserver<RespT> responseObserver)61 public static <ReqT, RespT> void asyncUnaryCall( 62 ClientCall<ReqT, RespT> call, ReqT req, StreamObserver<RespT> responseObserver) { 63 asyncUnaryRequestCall(call, req, responseObserver, false); 64 } 65 66 /** 67 * Executes a server-streaming call with a response {@link StreamObserver}. The {@code call} 68 * should not be already started. After calling this method, {@code call} should no longer be 69 * used. 70 */ asyncServerStreamingCall( ClientCall<ReqT, RespT> call, ReqT req, StreamObserver<RespT> responseObserver)71 public static <ReqT, RespT> void asyncServerStreamingCall( 72 ClientCall<ReqT, RespT> call, ReqT req, StreamObserver<RespT> responseObserver) { 73 asyncUnaryRequestCall(call, req, responseObserver, true); 74 } 75 76 /** 77 * Executes a client-streaming call returning a {@link StreamObserver} for the request messages. 78 * The {@code call} should not be already started. After calling this method, {@code call} 79 * should no longer be used. 80 * 81 * @return request stream observer. 82 */ asyncClientStreamingCall( ClientCall<ReqT, RespT> call, StreamObserver<RespT> responseObserver)83 public static <ReqT, RespT> StreamObserver<ReqT> asyncClientStreamingCall( 84 ClientCall<ReqT, RespT> call, 85 StreamObserver<RespT> responseObserver) { 86 return asyncStreamingRequestCall(call, responseObserver, false); 87 } 88 89 /** 90 * Executes a bidirectional-streaming call. The {@code call} should not be already started. 91 * After calling this method, {@code call} should no longer be used. 92 * 93 * @return request stream observer. 94 */ asyncBidiStreamingCall( ClientCall<ReqT, RespT> call, StreamObserver<RespT> responseObserver)95 public static <ReqT, RespT> StreamObserver<ReqT> asyncBidiStreamingCall( 96 ClientCall<ReqT, RespT> call, StreamObserver<RespT> responseObserver) { 97 return asyncStreamingRequestCall(call, responseObserver, true); 98 } 99 100 /** 101 * Executes a unary call and blocks on the response. The {@code call} should not be already 102 * started. After calling this method, {@code call} should no longer be used. 103 * 104 * @return the single response message. 105 */ blockingUnaryCall(ClientCall<ReqT, RespT> call, ReqT req)106 public static <ReqT, RespT> RespT blockingUnaryCall(ClientCall<ReqT, RespT> call, ReqT req) { 107 try { 108 return getUnchecked(futureUnaryCall(call, req)); 109 } catch (RuntimeException e) { 110 throw cancelThrow(call, e); 111 } catch (Error e) { 112 throw cancelThrow(call, e); 113 } 114 } 115 116 /** 117 * Executes a unary call and blocks on the response. The {@code call} should not be already 118 * started. After calling this method, {@code call} should no longer be used. 119 * 120 * @return the single response message. 121 */ blockingUnaryCall( Channel channel, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, ReqT req)122 public static <ReqT, RespT> RespT blockingUnaryCall( 123 Channel channel, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, ReqT req) { 124 ThreadlessExecutor executor = new ThreadlessExecutor(); 125 ClientCall<ReqT, RespT> call = channel.newCall(method, callOptions.withExecutor(executor)); 126 try { 127 ListenableFuture<RespT> responseFuture = futureUnaryCall(call, req); 128 while (!responseFuture.isDone()) { 129 try { 130 executor.waitAndDrain(); 131 } catch (InterruptedException e) { 132 Thread.currentThread().interrupt(); 133 throw Status.CANCELLED 134 .withDescription("Call was interrupted") 135 .withCause(e) 136 .asRuntimeException(); 137 } 138 } 139 return getUnchecked(responseFuture); 140 } catch (RuntimeException e) { 141 throw cancelThrow(call, e); 142 } catch (Error e) { 143 throw cancelThrow(call, e); 144 } 145 } 146 147 /** 148 * Executes a server-streaming call returning a blocking {@link Iterator} over the 149 * response stream. The {@code call} should not be already started. After calling this method, 150 * {@code call} should no longer be used. 151 * 152 * @return an iterator over the response stream. 153 */ 154 // TODO(louiscryan): Not clear if we want to use this idiom for 'simple' stubs. blockingServerStreamingCall( ClientCall<ReqT, RespT> call, ReqT req)155 public static <ReqT, RespT> Iterator<RespT> blockingServerStreamingCall( 156 ClientCall<ReqT, RespT> call, ReqT req) { 157 BlockingResponseStream<RespT> result = new BlockingResponseStream<RespT>(call); 158 asyncUnaryRequestCall(call, req, result.listener(), true); 159 return result; 160 } 161 162 /** 163 * Executes a server-streaming call returning a blocking {@link Iterator} over the 164 * response stream. The {@code call} should not be already started. After calling this method, 165 * {@code call} should no longer be used. 166 * 167 * @return an iterator over the response stream. 168 */ 169 // TODO(louiscryan): Not clear if we want to use this idiom for 'simple' stubs. blockingServerStreamingCall( Channel channel, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, ReqT req)170 public static <ReqT, RespT> Iterator<RespT> blockingServerStreamingCall( 171 Channel channel, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, ReqT req) { 172 ThreadlessExecutor executor = new ThreadlessExecutor(); 173 ClientCall<ReqT, RespT> call = channel.newCall(method, callOptions.withExecutor(executor)); 174 BlockingResponseStream<RespT> result = new BlockingResponseStream<RespT>(call, executor); 175 asyncUnaryRequestCall(call, req, result.listener(), true); 176 return result; 177 } 178 179 /** 180 * Executes a unary call and returns a {@link ListenableFuture} to the response. The 181 * {@code call} should not be already started. After calling this method, {@code call} should no 182 * longer be used. 183 * 184 * @return a future for the single response message. 185 */ futureUnaryCall( ClientCall<ReqT, RespT> call, ReqT req)186 public static <ReqT, RespT> ListenableFuture<RespT> futureUnaryCall( 187 ClientCall<ReqT, RespT> call, ReqT req) { 188 GrpcFuture<RespT> responseFuture = new GrpcFuture<RespT>(call); 189 asyncUnaryRequestCall(call, req, new UnaryStreamToFuture<RespT>(responseFuture), false); 190 return responseFuture; 191 } 192 193 /** 194 * Returns the result of calling {@link Future#get()} interruptibly on a task known not to throw a 195 * checked exception. 196 * 197 * <p>If interrupted, the interrupt is restored before throwing an exception.. 198 * 199 * @throws java.util.concurrent.CancellationException 200 * if {@code get} throws a {@code CancellationException}. 201 * @throws io.grpc.StatusRuntimeException if {@code get} throws an {@link ExecutionException} 202 * or an {@link InterruptedException}. 203 */ getUnchecked(Future<V> future)204 private static <V> V getUnchecked(Future<V> future) { 205 try { 206 return future.get(); 207 } catch (InterruptedException e) { 208 Thread.currentThread().interrupt(); 209 throw Status.CANCELLED 210 .withDescription("Call was interrupted") 211 .withCause(e) 212 .asRuntimeException(); 213 } catch (ExecutionException e) { 214 throw toStatusRuntimeException(e.getCause()); 215 } 216 } 217 218 /** 219 * Wraps the given {@link Throwable} in a {@link StatusRuntimeException}. If it contains an 220 * embedded {@link StatusException} or {@link StatusRuntimeException}, the returned exception will 221 * contain the embedded trailers and status, with the given exception as the cause. Otherwise, an 222 * exception will be generated from an {@link Status#UNKNOWN} status. 223 */ toStatusRuntimeException(Throwable t)224 private static StatusRuntimeException toStatusRuntimeException(Throwable t) { 225 Throwable cause = checkNotNull(t, "t"); 226 while (cause != null) { 227 // If we have an embedded status, use it and replace the cause 228 if (cause instanceof StatusException) { 229 StatusException se = (StatusException) cause; 230 return new StatusRuntimeException(se.getStatus(), se.getTrailers()); 231 } else if (cause instanceof StatusRuntimeException) { 232 StatusRuntimeException se = (StatusRuntimeException) cause; 233 return new StatusRuntimeException(se.getStatus(), se.getTrailers()); 234 } 235 cause = cause.getCause(); 236 } 237 return Status.UNKNOWN.withDescription("unexpected exception").withCause(t) 238 .asRuntimeException(); 239 } 240 241 /** 242 * Cancels a call, and throws the exception. 243 * 244 * @param t must be a RuntimeException or Error 245 */ cancelThrow(ClientCall<?, ?> call, Throwable t)246 private static RuntimeException cancelThrow(ClientCall<?, ?> call, Throwable t) { 247 try { 248 call.cancel(null, t); 249 } catch (Throwable e) { 250 assert e instanceof RuntimeException || e instanceof Error; 251 logger.log(Level.SEVERE, "RuntimeException encountered while closing call", e); 252 } 253 if (t instanceof RuntimeException) { 254 throw (RuntimeException) t; 255 } else if (t instanceof Error) { 256 throw (Error) t; 257 } 258 // should be impossible 259 throw new AssertionError(t); 260 } 261 asyncUnaryRequestCall( ClientCall<ReqT, RespT> call, ReqT req, StreamObserver<RespT> responseObserver, boolean streamingResponse)262 private static <ReqT, RespT> void asyncUnaryRequestCall( 263 ClientCall<ReqT, RespT> call, ReqT req, StreamObserver<RespT> responseObserver, 264 boolean streamingResponse) { 265 asyncUnaryRequestCall( 266 call, 267 req, 268 new StreamObserverToCallListenerAdapter<ReqT, RespT>( 269 responseObserver, 270 new CallToStreamObserverAdapter<ReqT>(call), 271 streamingResponse), 272 streamingResponse); 273 } 274 asyncUnaryRequestCall( ClientCall<ReqT, RespT> call, ReqT req, ClientCall.Listener<RespT> responseListener, boolean streamingResponse)275 private static <ReqT, RespT> void asyncUnaryRequestCall( 276 ClientCall<ReqT, RespT> call, 277 ReqT req, 278 ClientCall.Listener<RespT> responseListener, 279 boolean streamingResponse) { 280 startCall(call, responseListener, streamingResponse); 281 try { 282 call.sendMessage(req); 283 call.halfClose(); 284 } catch (RuntimeException e) { 285 throw cancelThrow(call, e); 286 } catch (Error e) { 287 throw cancelThrow(call, e); 288 } 289 } 290 asyncStreamingRequestCall( ClientCall<ReqT, RespT> call, StreamObserver<RespT> responseObserver, boolean streamingResponse)291 private static <ReqT, RespT> StreamObserver<ReqT> asyncStreamingRequestCall( 292 ClientCall<ReqT, RespT> call, 293 StreamObserver<RespT> responseObserver, 294 boolean streamingResponse) { 295 CallToStreamObserverAdapter<ReqT> adapter = new CallToStreamObserverAdapter<ReqT>(call); 296 startCall( 297 call, 298 new StreamObserverToCallListenerAdapter<ReqT, RespT>( 299 responseObserver, adapter, streamingResponse), 300 streamingResponse); 301 return adapter; 302 } 303 startCall( ClientCall<ReqT, RespT> call, ClientCall.Listener<RespT> responseListener, boolean streamingResponse)304 private static <ReqT, RespT> void startCall( 305 ClientCall<ReqT, RespT> call, 306 ClientCall.Listener<RespT> responseListener, 307 boolean streamingResponse) { 308 call.start(responseListener, new Metadata()); 309 if (streamingResponse) { 310 call.request(1); 311 } else { 312 // Initially ask for two responses from flow-control so that if a misbehaving server sends 313 // more than one responses, we can catch it and fail it in the listener. 314 call.request(2); 315 } 316 } 317 318 private static final class CallToStreamObserverAdapter<T> extends ClientCallStreamObserver<T> { 319 private boolean frozen; 320 private final ClientCall<T, ?> call; 321 private Runnable onReadyHandler; 322 private boolean autoFlowControlEnabled = true; 323 324 // Non private to avoid synthetic class CallToStreamObserverAdapter(ClientCall<T, ?> call)325 CallToStreamObserverAdapter(ClientCall<T, ?> call) { 326 this.call = call; 327 } 328 freeze()329 private void freeze() { 330 this.frozen = true; 331 } 332 333 @Override onNext(T value)334 public void onNext(T value) { 335 call.sendMessage(value); 336 } 337 338 @Override onError(Throwable t)339 public void onError(Throwable t) { 340 call.cancel("Cancelled by client with StreamObserver.onError()", t); 341 } 342 343 @Override onCompleted()344 public void onCompleted() { 345 call.halfClose(); 346 } 347 348 @Override isReady()349 public boolean isReady() { 350 return call.isReady(); 351 } 352 353 @Override setOnReadyHandler(Runnable onReadyHandler)354 public void setOnReadyHandler(Runnable onReadyHandler) { 355 if (frozen) { 356 throw new IllegalStateException("Cannot alter onReadyHandler after call started"); 357 } 358 this.onReadyHandler = onReadyHandler; 359 } 360 361 @Override disableAutoInboundFlowControl()362 public void disableAutoInboundFlowControl() { 363 if (frozen) { 364 throw new IllegalStateException("Cannot disable auto flow control call started"); 365 } 366 autoFlowControlEnabled = false; 367 } 368 369 @Override request(int count)370 public void request(int count) { 371 call.request(count); 372 } 373 374 @Override setMessageCompression(boolean enable)375 public void setMessageCompression(boolean enable) { 376 call.setMessageCompression(enable); 377 } 378 379 @Override cancel(@ullable String message, @Nullable Throwable cause)380 public void cancel(@Nullable String message, @Nullable Throwable cause) { 381 call.cancel(message, cause); 382 } 383 } 384 385 private static final class StreamObserverToCallListenerAdapter<ReqT, RespT> 386 extends ClientCall.Listener<RespT> { 387 private final StreamObserver<RespT> observer; 388 private final CallToStreamObserverAdapter<ReqT> adapter; 389 private final boolean streamingResponse; 390 private boolean firstResponseReceived; 391 392 // Non private to avoid synthetic class StreamObserverToCallListenerAdapter( StreamObserver<RespT> observer, CallToStreamObserverAdapter<ReqT> adapter, boolean streamingResponse)393 StreamObserverToCallListenerAdapter( 394 StreamObserver<RespT> observer, 395 CallToStreamObserverAdapter<ReqT> adapter, 396 boolean streamingResponse) { 397 this.observer = observer; 398 this.streamingResponse = streamingResponse; 399 this.adapter = adapter; 400 if (observer instanceof ClientResponseObserver) { 401 @SuppressWarnings("unchecked") 402 ClientResponseObserver<ReqT, RespT> clientResponseObserver = 403 (ClientResponseObserver<ReqT, RespT>) observer; 404 clientResponseObserver.beforeStart(adapter); 405 } 406 adapter.freeze(); 407 } 408 409 @Override onHeaders(Metadata headers)410 public void onHeaders(Metadata headers) { 411 } 412 413 @Override onMessage(RespT message)414 public void onMessage(RespT message) { 415 if (firstResponseReceived && !streamingResponse) { 416 throw Status.INTERNAL 417 .withDescription("More than one responses received for unary or client-streaming call") 418 .asRuntimeException(); 419 } 420 firstResponseReceived = true; 421 observer.onNext(message); 422 423 if (streamingResponse && adapter.autoFlowControlEnabled) { 424 // Request delivery of the next inbound message. 425 adapter.request(1); 426 } 427 } 428 429 @Override onClose(Status status, Metadata trailers)430 public void onClose(Status status, Metadata trailers) { 431 if (status.isOk()) { 432 observer.onCompleted(); 433 } else { 434 observer.onError(status.asRuntimeException(trailers)); 435 } 436 } 437 438 @Override onReady()439 public void onReady() { 440 if (adapter.onReadyHandler != null) { 441 adapter.onReadyHandler.run(); 442 } 443 } 444 } 445 446 /** 447 * Completes a {@link GrpcFuture} using {@link StreamObserver} events. 448 */ 449 private static final class UnaryStreamToFuture<RespT> extends ClientCall.Listener<RespT> { 450 private final GrpcFuture<RespT> responseFuture; 451 private RespT value; 452 453 // Non private to avoid synthetic class UnaryStreamToFuture(GrpcFuture<RespT> responseFuture)454 UnaryStreamToFuture(GrpcFuture<RespT> responseFuture) { 455 this.responseFuture = responseFuture; 456 } 457 458 @Override onHeaders(Metadata headers)459 public void onHeaders(Metadata headers) { 460 } 461 462 @Override onMessage(RespT value)463 public void onMessage(RespT value) { 464 if (this.value != null) { 465 throw Status.INTERNAL.withDescription("More than one value received for unary call") 466 .asRuntimeException(); 467 } 468 this.value = value; 469 } 470 471 @Override onClose(Status status, Metadata trailers)472 public void onClose(Status status, Metadata trailers) { 473 if (status.isOk()) { 474 if (value == null) { 475 // No value received so mark the future as an error 476 responseFuture.setException( 477 Status.INTERNAL.withDescription("No value received for unary call") 478 .asRuntimeException(trailers)); 479 } 480 responseFuture.set(value); 481 } else { 482 responseFuture.setException(status.asRuntimeException(trailers)); 483 } 484 } 485 } 486 487 private static final class GrpcFuture<RespT> extends AbstractFuture<RespT> { 488 private final ClientCall<?, RespT> call; 489 490 // Non private to avoid synthetic class GrpcFuture(ClientCall<?, RespT> call)491 GrpcFuture(ClientCall<?, RespT> call) { 492 this.call = call; 493 } 494 495 @Override interruptTask()496 protected void interruptTask() { 497 call.cancel("GrpcFuture was cancelled", null); 498 } 499 500 @Override set(@ullable RespT resp)501 protected boolean set(@Nullable RespT resp) { 502 return super.set(resp); 503 } 504 505 @Override setException(Throwable throwable)506 protected boolean setException(Throwable throwable) { 507 return super.setException(throwable); 508 } 509 510 @SuppressWarnings("MissingOverride") // Add @Override once Java 6 support is dropped pendingToString()511 protected String pendingToString() { 512 return MoreObjects.toStringHelper(this).add("clientCall", call).toString(); 513 } 514 } 515 516 /** 517 * Convert events on a {@link io.grpc.ClientCall.Listener} into a blocking {@link Iterator}. 518 * 519 * <p>The class is not thread-safe, but it does permit {@link ClientCall.Listener} calls in a 520 * separate thread from {@link Iterator} calls. 521 */ 522 // TODO(ejona86): determine how to allow ClientCall.cancel() in case of application error. 523 private static final class BlockingResponseStream<T> implements Iterator<T> { 524 // Due to flow control, only needs to hold up to 2 items: 1 for value, 1 for close. 525 private final BlockingQueue<Object> buffer = new ArrayBlockingQueue<Object>(2); 526 private final ClientCall.Listener<T> listener = new QueuingListener(); 527 private final ClientCall<?, T> call; 528 /** May be null. */ 529 private final ThreadlessExecutor threadless; 530 // Only accessed when iterating. 531 private Object last; 532 533 // Non private to avoid synthetic class BlockingResponseStream(ClientCall<?, T> call)534 BlockingResponseStream(ClientCall<?, T> call) { 535 this(call, null); 536 } 537 538 // Non private to avoid synthetic class BlockingResponseStream(ClientCall<?, T> call, ThreadlessExecutor threadless)539 BlockingResponseStream(ClientCall<?, T> call, ThreadlessExecutor threadless) { 540 this.call = call; 541 this.threadless = threadless; 542 } 543 listener()544 ClientCall.Listener<T> listener() { 545 return listener; 546 } 547 waitForNext()548 private Object waitForNext() throws InterruptedException { 549 if (threadless == null) { 550 return buffer.take(); 551 } else { 552 Object next = buffer.poll(); 553 while (next == null) { 554 threadless.waitAndDrain(); 555 next = buffer.poll(); 556 } 557 return next; 558 } 559 } 560 561 @Override hasNext()562 public boolean hasNext() { 563 if (last == null) { 564 try { 565 // Will block here indefinitely waiting for content. RPC timeouts defend against permanent 566 // hangs here as the call will become closed. 567 last = waitForNext(); 568 } catch (InterruptedException ie) { 569 Thread.currentThread().interrupt(); 570 throw Status.CANCELLED.withDescription("interrupted").withCause(ie).asRuntimeException(); 571 } 572 } 573 if (last instanceof StatusRuntimeException) { 574 // Rethrow the exception with a new stacktrace. 575 StatusRuntimeException e = (StatusRuntimeException) last; 576 throw e.getStatus().asRuntimeException(e.getTrailers()); 577 } 578 return last != this; 579 } 580 581 @Override next()582 public T next() { 583 if (!hasNext()) { 584 throw new NoSuchElementException(); 585 } 586 try { 587 call.request(1); 588 @SuppressWarnings("unchecked") 589 T tmp = (T) last; 590 return tmp; 591 } finally { 592 last = null; 593 } 594 } 595 596 @Override remove()597 public void remove() { 598 throw new UnsupportedOperationException(); 599 } 600 601 private final class QueuingListener extends ClientCall.Listener<T> { 602 // Non private to avoid synthetic class QueuingListener()603 QueuingListener() {} 604 605 private boolean done = false; 606 607 @Override onHeaders(Metadata headers)608 public void onHeaders(Metadata headers) { 609 } 610 611 @Override onMessage(T value)612 public void onMessage(T value) { 613 Preconditions.checkState(!done, "ClientCall already closed"); 614 buffer.add(value); 615 } 616 617 @Override onClose(Status status, Metadata trailers)618 public void onClose(Status status, Metadata trailers) { 619 Preconditions.checkState(!done, "ClientCall already closed"); 620 if (status.isOk()) { 621 buffer.add(BlockingResponseStream.this); 622 } else { 623 buffer.add(status.asRuntimeException(trailers)); 624 } 625 done = true; 626 } 627 } 628 } 629 630 private static final class ThreadlessExecutor implements Executor { 631 private static final Logger log = Logger.getLogger(ThreadlessExecutor.class.getName()); 632 633 private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(); 634 635 // Non private to avoid synthetic class ThreadlessExecutor()636 ThreadlessExecutor() {} 637 638 /** 639 * Waits until there is a Runnable, then executes it and all queued Runnables after it. 640 */ waitAndDrain()641 public void waitAndDrain() throws InterruptedException { 642 Runnable runnable = queue.take(); 643 while (runnable != null) { 644 try { 645 runnable.run(); 646 } catch (Throwable t) { 647 log.log(Level.WARNING, "Runnable threw exception", t); 648 } 649 runnable = queue.poll(); 650 } 651 } 652 653 @Override execute(Runnable runnable)654 public void execute(Runnable runnable) { 655 queue.add(runnable); 656 } 657 } 658 } 659