1 // 2 // ======================================================================== 3 // Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd. 4 // ------------------------------------------------------------------------ 5 // All rights reserved. This program and the accompanying materials 6 // are made available under the terms of the Eclipse Public License v1.0 7 // and Apache License v2.0 which accompanies this distribution. 8 // 9 // The Eclipse Public License is available at 10 // http://www.eclipse.org/legal/epl-v10.html 11 // 12 // The Apache License v2.0 is available at 13 // http://www.opensource.org/licenses/apache2.0.php 14 // 15 // You may elect to redistribute this code under either of these licenses. 16 // ======================================================================== 17 // 18 19 package org.eclipse.jetty.io.nio; 20 21 import java.io.IOException; 22 import java.io.InterruptedIOException; 23 import java.nio.channels.ClosedChannelException; 24 import java.nio.channels.SelectableChannel; 25 import java.nio.channels.SelectionKey; 26 import java.nio.channels.SocketChannel; 27 import java.util.Locale; 28 29 import org.eclipse.jetty.io.AsyncEndPoint; 30 import org.eclipse.jetty.io.Buffer; 31 import org.eclipse.jetty.io.ConnectedEndPoint; 32 import org.eclipse.jetty.io.Connection; 33 import org.eclipse.jetty.io.EofException; 34 import org.eclipse.jetty.io.nio.SelectorManager.SelectSet; 35 import org.eclipse.jetty.util.log.Log; 36 import org.eclipse.jetty.util.log.Logger; 37 import org.eclipse.jetty.util.thread.Timeout.Task; 38 39 /* ------------------------------------------------------------ */ 40 /** 41 * An Endpoint that can be scheduled by {@link SelectorManager}. 42 */ 43 public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPoint, ConnectedEndPoint 44 { 45 public static final Logger LOG=Log.getLogger("org.eclipse.jetty.io.nio"); 46 47 private final boolean WORK_AROUND_JVM_BUG_6346658 = System.getProperty("os.name").toLowerCase(Locale.ENGLISH).contains("win"); 48 private final SelectorManager.SelectSet _selectSet; 49 private final SelectorManager _manager; 50 private SelectionKey _key; 51 private final Runnable _handler = new Runnable() 52 { 53 public void run() { handle(); } 54 }; 55 56 /** The desired value for {@link SelectionKey#interestOps()} */ 57 private int _interestOps; 58 59 /** 60 * The connection instance is the handler for any IO activity on the endpoint. 61 * There is a different type of connection for HTTP, AJP, WebSocket and 62 * ProxyConnect. The connection may change for an SCEP as it is upgraded 63 * from HTTP to proxy connect or websocket. 64 */ 65 private volatile AsyncConnection _connection; 66 67 private static final int STATE_NEEDS_DISPATCH=-1; 68 private static final int STATE_UNDISPATCHED=0; 69 private static final int STATE_DISPATCHED=1; 70 private static final int STATE_ASYNC=2; 71 private int _state; 72 73 private boolean _onIdle; 74 75 /** true if the last write operation succeed and wrote all offered bytes */ 76 private volatile boolean _writable = true; 77 78 79 /** True if a thread has is blocked in {@link #blockReadable(long)} */ 80 private boolean _readBlocked; 81 82 /** True if a thread has is blocked in {@link #blockWritable(long)} */ 83 private boolean _writeBlocked; 84 85 /** true if {@link SelectSet#destroyEndPoint(SelectChannelEndPoint)} has not been called */ 86 private boolean _open; 87 88 private volatile long _idleTimestamp; 89 private volatile boolean _checkIdle; 90 91 private boolean _interruptable; 92 93 private boolean _ishut; 94 95 /* ------------------------------------------------------------ */ SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime)96 public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime) 97 throws IOException 98 { 99 super(channel, maxIdleTime); 100 101 _manager = selectSet.getManager(); 102 _selectSet = selectSet; 103 _state=STATE_UNDISPATCHED; 104 _onIdle=false; 105 _open=true; 106 _key = key; 107 108 setCheckForIdle(true); 109 } 110 111 /* ------------------------------------------------------------ */ getSelectionKey()112 public SelectionKey getSelectionKey() 113 { 114 synchronized (this) 115 { 116 return _key; 117 } 118 } 119 120 /* ------------------------------------------------------------ */ getSelectManager()121 public SelectorManager getSelectManager() 122 { 123 return _manager; 124 } 125 126 /* ------------------------------------------------------------ */ getConnection()127 public Connection getConnection() 128 { 129 return _connection; 130 } 131 132 /* ------------------------------------------------------------ */ setConnection(Connection connection)133 public void setConnection(Connection connection) 134 { 135 Connection old=_connection; 136 _connection=(AsyncConnection)connection; 137 if (old!=null && old!=_connection) 138 _manager.endPointUpgraded(this,old); 139 } 140 141 /* ------------------------------------------------------------ */ getIdleTimestamp()142 public long getIdleTimestamp() 143 { 144 return _idleTimestamp; 145 } 146 147 /* ------------------------------------------------------------ */ 148 /** Called by selectSet to schedule handling 149 * 150 */ schedule()151 public void schedule() 152 { 153 synchronized (this) 154 { 155 // If there is no key, then do nothing 156 if (_key == null || !_key.isValid()) 157 { 158 _readBlocked=false; 159 _writeBlocked=false; 160 this.notifyAll(); 161 return; 162 } 163 164 // If there are threads dispatched reading and writing 165 if (_readBlocked || _writeBlocked) 166 { 167 // assert _dispatched; 168 if (_readBlocked && _key.isReadable()) 169 _readBlocked=false; 170 if (_writeBlocked && _key.isWritable()) 171 _writeBlocked=false; 172 173 // wake them up is as good as a dispatched. 174 this.notifyAll(); 175 176 // we are not interested in further selecting 177 _key.interestOps(0); 178 if (_state<STATE_DISPATCHED) 179 updateKey(); 180 return; 181 } 182 183 // Remove writeable op 184 if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) 185 { 186 // Remove writeable op 187 _interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE; 188 _key.interestOps(_interestOps); 189 _writable = true; // Once writable is in ops, only removed with dispatch. 190 } 191 192 // If dispatched, then deregister interest 193 if (_state>=STATE_DISPATCHED) 194 _key.interestOps(0); 195 else 196 { 197 // other wise do the dispatch 198 dispatch(); 199 if (_state>=STATE_DISPATCHED && !_selectSet.getManager().isDeferringInterestedOps0()) 200 { 201 _key.interestOps(0); 202 } 203 } 204 } 205 } 206 207 /* ------------------------------------------------------------ */ asyncDispatch()208 public void asyncDispatch() 209 { 210 synchronized(this) 211 { 212 switch(_state) 213 { 214 case STATE_NEEDS_DISPATCH: 215 case STATE_UNDISPATCHED: 216 dispatch(); 217 break; 218 219 case STATE_DISPATCHED: 220 case STATE_ASYNC: 221 _state=STATE_ASYNC; 222 break; 223 } 224 } 225 } 226 227 /* ------------------------------------------------------------ */ dispatch()228 public void dispatch() 229 { 230 synchronized(this) 231 { 232 if (_state<=STATE_UNDISPATCHED) 233 { 234 if (_onIdle) 235 _state = STATE_NEEDS_DISPATCH; 236 else 237 { 238 _state = STATE_DISPATCHED; 239 boolean dispatched = _manager.dispatch(_handler); 240 if(!dispatched) 241 { 242 _state = STATE_NEEDS_DISPATCH; 243 LOG.warn("Dispatched Failed! "+this+" to "+_manager); 244 updateKey(); 245 } 246 } 247 } 248 } 249 } 250 251 /* ------------------------------------------------------------ */ 252 /** 253 * Called when a dispatched thread is no longer handling the endpoint. 254 * The selection key operations are updated. 255 * @return If false is returned, the endpoint has been redispatched and 256 * thread must keep handling the endpoint. 257 */ undispatch()258 protected boolean undispatch() 259 { 260 synchronized (this) 261 { 262 switch(_state) 263 { 264 case STATE_ASYNC: 265 _state=STATE_DISPATCHED; 266 return false; 267 268 default: 269 _state=STATE_UNDISPATCHED; 270 updateKey(); 271 return true; 272 } 273 } 274 } 275 276 /* ------------------------------------------------------------ */ cancelTimeout(Task task)277 public void cancelTimeout(Task task) 278 { 279 getSelectSet().cancelTimeout(task); 280 } 281 282 /* ------------------------------------------------------------ */ scheduleTimeout(Task task, long timeoutMs)283 public void scheduleTimeout(Task task, long timeoutMs) 284 { 285 getSelectSet().scheduleTimeout(task,timeoutMs); 286 } 287 288 /* ------------------------------------------------------------ */ setCheckForIdle(boolean check)289 public void setCheckForIdle(boolean check) 290 { 291 if (check) 292 { 293 _idleTimestamp=System.currentTimeMillis(); 294 _checkIdle=true; 295 } 296 else 297 _checkIdle=false; 298 } 299 300 /* ------------------------------------------------------------ */ isCheckForIdle()301 public boolean isCheckForIdle() 302 { 303 return _checkIdle; 304 } 305 306 /* ------------------------------------------------------------ */ notIdle()307 protected void notIdle() 308 { 309 _idleTimestamp=System.currentTimeMillis(); 310 } 311 312 /* ------------------------------------------------------------ */ checkIdleTimestamp(long now)313 public void checkIdleTimestamp(long now) 314 { 315 if (isCheckForIdle() && _maxIdleTime>0) 316 { 317 final long idleForMs=now-_idleTimestamp; 318 319 if (idleForMs>_maxIdleTime) 320 { 321 // Don't idle out again until onIdleExpired task completes. 322 setCheckForIdle(false); 323 _manager.dispatch(new Runnable() 324 { 325 public void run() 326 { 327 try 328 { 329 onIdleExpired(idleForMs); 330 } 331 finally 332 { 333 setCheckForIdle(true); 334 } 335 } 336 }); 337 } 338 } 339 } 340 341 /* ------------------------------------------------------------ */ onIdleExpired(long idleForMs)342 public void onIdleExpired(long idleForMs) 343 { 344 try 345 { 346 synchronized (this) 347 { 348 _onIdle=true; 349 } 350 351 _connection.onIdleExpired(idleForMs); 352 } 353 finally 354 { 355 synchronized (this) 356 { 357 _onIdle=false; 358 if (_state==STATE_NEEDS_DISPATCH) 359 dispatch(); 360 } 361 } 362 } 363 364 /* ------------------------------------------------------------ */ 365 @Override fill(Buffer buffer)366 public int fill(Buffer buffer) throws IOException 367 { 368 int fill=super.fill(buffer); 369 if (fill>0) 370 notIdle(); 371 return fill; 372 } 373 374 /* ------------------------------------------------------------ */ 375 @Override flush(Buffer header, Buffer buffer, Buffer trailer)376 public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException 377 { 378 int l = super.flush(header, buffer, trailer); 379 380 // If there was something to write and it wasn't written, then we are not writable. 381 if (l==0 && ( header!=null && header.hasContent() || buffer!=null && buffer.hasContent() || trailer!=null && trailer.hasContent())) 382 { 383 synchronized (this) 384 { 385 _writable=false; 386 if (_state<STATE_DISPATCHED) 387 updateKey(); 388 } 389 } 390 else if (l>0) 391 { 392 _writable=true; 393 notIdle(); 394 } 395 return l; 396 } 397 398 /* ------------------------------------------------------------ */ 399 /* 400 */ 401 @Override flush(Buffer buffer)402 public int flush(Buffer buffer) throws IOException 403 { 404 int l = super.flush(buffer); 405 406 // If there was something to write and it wasn't written, then we are not writable. 407 if (l==0 && buffer!=null && buffer.hasContent()) 408 { 409 synchronized (this) 410 { 411 _writable=false; 412 if (_state<STATE_DISPATCHED) 413 updateKey(); 414 } 415 } 416 else if (l>0) 417 { 418 _writable=true; 419 notIdle(); 420 } 421 422 return l; 423 } 424 425 /* ------------------------------------------------------------ */ 426 /* 427 * Allows thread to block waiting for further events. 428 */ 429 @Override blockReadable(long timeoutMs)430 public boolean blockReadable(long timeoutMs) throws IOException 431 { 432 synchronized (this) 433 { 434 if (isInputShutdown()) 435 throw new EofException(); 436 437 long now=_selectSet.getNow(); 438 long end=now+timeoutMs; 439 boolean check=isCheckForIdle(); 440 setCheckForIdle(true); 441 try 442 { 443 _readBlocked=true; 444 while (!isInputShutdown() && _readBlocked) 445 { 446 try 447 { 448 updateKey(); 449 this.wait(timeoutMs>0?(end-now):10000); 450 } 451 catch (final InterruptedException e) 452 { 453 LOG.warn(e); 454 if (_interruptable) 455 throw new InterruptedIOException(){{this.initCause(e);}}; 456 } 457 finally 458 { 459 now=_selectSet.getNow(); 460 } 461 462 if (_readBlocked && timeoutMs>0 && now>=end) 463 return false; 464 } 465 } 466 finally 467 { 468 _readBlocked=false; 469 setCheckForIdle(check); 470 } 471 } 472 return true; 473 } 474 475 /* ------------------------------------------------------------ */ 476 /* 477 * Allows thread to block waiting for further events. 478 */ 479 @Override blockWritable(long timeoutMs)480 public boolean blockWritable(long timeoutMs) throws IOException 481 { 482 synchronized (this) 483 { 484 if (isOutputShutdown()) 485 throw new EofException(); 486 487 long now=_selectSet.getNow(); 488 long end=now+timeoutMs; 489 boolean check=isCheckForIdle(); 490 setCheckForIdle(true); 491 try 492 { 493 _writeBlocked=true; 494 while (_writeBlocked && !isOutputShutdown()) 495 { 496 try 497 { 498 updateKey(); 499 this.wait(timeoutMs>0?(end-now):10000); 500 } 501 catch (final InterruptedException e) 502 { 503 LOG.warn(e); 504 if (_interruptable) 505 throw new InterruptedIOException(){{this.initCause(e);}}; 506 } 507 finally 508 { 509 now=_selectSet.getNow(); 510 } 511 if (_writeBlocked && timeoutMs>0 && now>=end) 512 return false; 513 } 514 } 515 finally 516 { 517 _writeBlocked=false; 518 setCheckForIdle(check); 519 } 520 } 521 return true; 522 } 523 524 /* ------------------------------------------------------------ */ 525 /** Set the interruptable mode of the endpoint. 526 * If set to false (default), then interrupts are assumed to be spurious 527 * and blocking operations continue unless the endpoint has been closed. 528 * If true, then interrupts of blocking operations result in InterruptedIOExceptions 529 * being thrown. 530 * @param interupable 531 */ setInterruptable(boolean interupable)532 public void setInterruptable(boolean interupable) 533 { 534 synchronized (this) 535 { 536 _interruptable=interupable; 537 } 538 } 539 540 /* ------------------------------------------------------------ */ isInterruptable()541 public boolean isInterruptable() 542 { 543 return _interruptable; 544 } 545 546 /* ------------------------------------------------------------ */ 547 /** 548 * @see org.eclipse.jetty.io.AsyncEndPoint#scheduleWrite() 549 */ scheduleWrite()550 public void scheduleWrite() 551 { 552 if (_writable) 553 LOG.debug("Required scheduleWrite {}",this); 554 555 _writable=false; 556 updateKey(); 557 } 558 559 /* ------------------------------------------------------------ */ isWritable()560 public boolean isWritable() 561 { 562 return _writable; 563 } 564 565 /* ------------------------------------------------------------ */ hasProgressed()566 public boolean hasProgressed() 567 { 568 return false; 569 } 570 571 /* ------------------------------------------------------------ */ 572 /** 573 * Updates selection key. Adds operations types to the selection key as needed. No operations 574 * are removed as this is only done during dispatch. This method records the new key and 575 * schedules a call to doUpdateKey to do the keyChange 576 */ updateKey()577 private void updateKey() 578 { 579 final boolean changed; 580 synchronized (this) 581 { 582 int current_ops=-1; 583 if (getChannel().isOpen()) 584 { 585 boolean read_interest = _readBlocked || (_state<STATE_DISPATCHED && !_connection.isSuspended()); 586 boolean write_interest= _writeBlocked || (_state<STATE_DISPATCHED && !_writable); 587 588 _interestOps = 589 ((!_socket.isInputShutdown() && read_interest ) ? SelectionKey.OP_READ : 0) 590 | ((!_socket.isOutputShutdown()&& write_interest) ? SelectionKey.OP_WRITE : 0); 591 try 592 { 593 current_ops = ((_key!=null && _key.isValid())?_key.interestOps():-1); 594 } 595 catch(Exception e) 596 { 597 _key=null; 598 LOG.ignore(e); 599 } 600 } 601 changed=_interestOps!=current_ops; 602 } 603 604 if(changed) 605 { 606 _selectSet.addChange(this); 607 _selectSet.wakeup(); 608 } 609 } 610 611 612 /* ------------------------------------------------------------ */ 613 /** 614 * Synchronize the interestOps with the actual key. Call is scheduled by a call to updateKey 615 */ doUpdateKey()616 void doUpdateKey() 617 { 618 synchronized (this) 619 { 620 if (getChannel().isOpen()) 621 { 622 if (_interestOps>0) 623 { 624 if (_key==null || !_key.isValid()) 625 { 626 SelectableChannel sc = (SelectableChannel)getChannel(); 627 if (sc.isRegistered()) 628 { 629 updateKey(); 630 } 631 else 632 { 633 try 634 { 635 _key=((SelectableChannel)getChannel()).register(_selectSet.getSelector(),_interestOps,this); 636 } 637 catch (Exception e) 638 { 639 LOG.ignore(e); 640 if (_key!=null && _key.isValid()) 641 { 642 _key.cancel(); 643 } 644 645 if (_open) 646 { 647 _selectSet.destroyEndPoint(this); 648 } 649 _open=false; 650 _key = null; 651 } 652 } 653 } 654 else 655 { 656 _key.interestOps(_interestOps); 657 } 658 } 659 else 660 { 661 if (_key!=null && _key.isValid()) 662 _key.interestOps(0); 663 else 664 _key=null; 665 } 666 } 667 else 668 { 669 if (_key!=null && _key.isValid()) 670 _key.cancel(); 671 672 if (_open) 673 { 674 _open=false; 675 _selectSet.destroyEndPoint(this); 676 } 677 _key = null; 678 } 679 } 680 } 681 682 /* ------------------------------------------------------------ */ 683 /* 684 */ handle()685 protected void handle() 686 { 687 boolean dispatched=true; 688 try 689 { 690 while(dispatched) 691 { 692 try 693 { 694 while(true) 695 { 696 final AsyncConnection next = (AsyncConnection)_connection.handle(); 697 if (next!=_connection) 698 { 699 LOG.debug("{} replaced {}",next,_connection); 700 Connection old=_connection; 701 _connection=next; 702 _manager.endPointUpgraded(this,old); 703 continue; 704 } 705 break; 706 } 707 } 708 catch (ClosedChannelException e) 709 { 710 LOG.ignore(e); 711 } 712 catch (EofException e) 713 { 714 LOG.debug("EOF", e); 715 try{close();} 716 catch(IOException e2){LOG.ignore(e2);} 717 } 718 catch (IOException e) 719 { 720 LOG.warn(e.toString()); 721 try{close();} 722 catch(IOException e2){LOG.ignore(e2);} 723 } 724 catch (Throwable e) 725 { 726 LOG.warn("handle failed", e); 727 try{close();} 728 catch(IOException e2){LOG.ignore(e2);} 729 } 730 finally 731 { 732 if (!_ishut && isInputShutdown() && isOpen()) 733 { 734 _ishut=true; 735 try 736 { 737 _connection.onInputShutdown(); 738 } 739 catch(Throwable x) 740 { 741 LOG.warn("onInputShutdown failed", x); 742 try{close();} 743 catch(IOException e2){LOG.ignore(e2);} 744 } 745 finally 746 { 747 updateKey(); 748 } 749 } 750 dispatched=!undispatch(); 751 } 752 } 753 } 754 finally 755 { 756 if (dispatched) 757 { 758 dispatched=!undispatch(); 759 while (dispatched) 760 { 761 LOG.warn("SCEP.run() finally DISPATCHED"); 762 dispatched=!undispatch(); 763 } 764 } 765 } 766 } 767 768 /* ------------------------------------------------------------ */ 769 /* 770 * @see org.eclipse.io.nio.ChannelEndPoint#close() 771 */ 772 @Override close()773 public void close() throws IOException 774 { 775 // On unix systems there is a JVM issue that if you cancel before closing, it can 776 // cause the selector to block waiting for a channel to close and that channel can 777 // block waiting for the remote end. But on windows, if you don't cancel before a 778 // close, then the selector can block anyway! 779 // https://bugs.eclipse.org/bugs/show_bug.cgi?id=357318 780 if (WORK_AROUND_JVM_BUG_6346658) 781 { 782 try 783 { 784 SelectionKey key = _key; 785 if (key!=null) 786 key.cancel(); 787 } 788 catch (Throwable e) 789 { 790 LOG.ignore(e); 791 } 792 } 793 794 try 795 { 796 super.close(); 797 } 798 catch (IOException e) 799 { 800 LOG.ignore(e); 801 } 802 finally 803 { 804 updateKey(); 805 } 806 } 807 808 /* ------------------------------------------------------------ */ 809 @Override toString()810 public String toString() 811 { 812 // Do NOT use synchronized (this) 813 // because it's very easy to deadlock when debugging is enabled. 814 // We do a best effort to print the right toString() and that's it. 815 SelectionKey key = _key; 816 String keyString = ""; 817 if (key != null) 818 { 819 if (key.isValid()) 820 { 821 if (key.isReadable()) 822 keyString += "r"; 823 if (key.isWritable()) 824 keyString += "w"; 825 } 826 else 827 { 828 keyString += "!"; 829 } 830 } 831 else 832 { 833 keyString += "-"; 834 } 835 return String.format("SCEP@%x{l(%s)<->r(%s),s=%d,open=%b,ishut=%b,oshut=%b,rb=%b,wb=%b,w=%b,i=%d%s}-{%s}", 836 hashCode(), 837 _socket.getRemoteSocketAddress(), 838 _socket.getLocalSocketAddress(), 839 _state, 840 isOpen(), 841 isInputShutdown(), 842 isOutputShutdown(), 843 _readBlocked, 844 _writeBlocked, 845 _writable, 846 _interestOps, 847 keyString, 848 _connection); 849 } 850 851 /* ------------------------------------------------------------ */ getSelectSet()852 public SelectSet getSelectSet() 853 { 854 return _selectSet; 855 } 856 857 /* ------------------------------------------------------------ */ 858 /** 859 * Don't set the SoTimeout 860 * @see org.eclipse.jetty.io.nio.ChannelEndPoint#setMaxIdleTime(int) 861 */ 862 @Override setMaxIdleTime(int timeMs)863 public void setMaxIdleTime(int timeMs) throws IOException 864 { 865 _maxIdleTime=timeMs; 866 } 867 868 } 869