1 // 2 // ======================================================================== 3 // Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd. 4 // ------------------------------------------------------------------------ 5 // All rights reserved. This program and the accompanying materials 6 // are made available under the terms of the Eclipse Public License v1.0 7 // and Apache License v2.0 which accompanies this distribution. 8 // 9 // The Eclipse Public License is available at 10 // http://www.eclipse.org/legal/epl-v10.html 11 // 12 // The Apache License v2.0 is available at 13 // http://www.opensource.org/licenses/apache2.0.php 14 // 15 // You may elect to redistribute this code under either of these licenses. 16 // ======================================================================== 17 // 18 19 package org.eclipse.jetty.io.nio; 20 21 import java.io.IOException; 22 import java.nio.channels.CancelledKeyException; 23 import java.nio.channels.Channel; 24 import java.nio.channels.ClosedSelectorException; 25 import java.nio.channels.SelectableChannel; 26 import java.nio.channels.SelectionKey; 27 import java.nio.channels.Selector; 28 import java.nio.channels.ServerSocketChannel; 29 import java.nio.channels.SocketChannel; 30 import java.util.ArrayList; 31 import java.util.List; 32 import java.util.Set; 33 import java.util.concurrent.ConcurrentHashMap; 34 import java.util.concurrent.ConcurrentLinkedQueue; 35 import java.util.concurrent.ConcurrentMap; 36 import java.util.concurrent.CountDownLatch; 37 import java.util.concurrent.TimeUnit; 38 39 import org.eclipse.jetty.io.AsyncEndPoint; 40 import org.eclipse.jetty.io.ConnectedEndPoint; 41 import org.eclipse.jetty.io.Connection; 42 import org.eclipse.jetty.io.EndPoint; 43 import org.eclipse.jetty.util.TypeUtil; 44 import org.eclipse.jetty.util.component.AbstractLifeCycle; 45 import org.eclipse.jetty.util.component.AggregateLifeCycle; 46 import org.eclipse.jetty.util.component.Dumpable; 47 import org.eclipse.jetty.util.log.Log; 48 import org.eclipse.jetty.util.log.Logger; 49 import org.eclipse.jetty.util.thread.Timeout; 50 import org.eclipse.jetty.util.thread.Timeout.Task; 51 52 53 /* ------------------------------------------------------------ */ 54 /** 55 * The Selector Manager manages and number of SelectSets to allow 56 * NIO scheduling to scale to large numbers of connections. 57 * <p> 58 */ 59 public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable 60 { 61 public static final Logger LOG=Log.getLogger("org.eclipse.jetty.io.nio"); 62 63 private static final int __MONITOR_PERIOD=Integer.getInteger("org.eclipse.jetty.io.nio.MONITOR_PERIOD",1000).intValue(); 64 private static final int __MAX_SELECTS=Integer.getInteger("org.eclipse.jetty.io.nio.MAX_SELECTS",100000).intValue(); 65 private static final int __BUSY_PAUSE=Integer.getInteger("org.eclipse.jetty.io.nio.BUSY_PAUSE",50).intValue(); 66 private static final int __IDLE_TICK=Integer.getInteger("org.eclipse.jetty.io.nio.IDLE_TICK",400).intValue(); 67 68 private int _maxIdleTime; 69 private int _lowResourcesMaxIdleTime; 70 private long _lowResourcesConnections; 71 private SelectSet[] _selectSet; 72 private int _selectSets=1; 73 private volatile int _set=0; 74 private boolean _deferringInterestedOps0=true; 75 private int _selectorPriorityDelta=0; 76 77 /* ------------------------------------------------------------ */ 78 /** 79 * @param maxIdleTime The maximum period in milli seconds that a connection may be idle before it is closed. 80 * @see #setLowResourcesMaxIdleTime(long) 81 */ setMaxIdleTime(long maxIdleTime)82 public void setMaxIdleTime(long maxIdleTime) 83 { 84 _maxIdleTime=(int)maxIdleTime; 85 } 86 87 /* ------------------------------------------------------------ */ 88 /** 89 * @param selectSets number of select sets to create 90 */ setSelectSets(int selectSets)91 public void setSelectSets(int selectSets) 92 { 93 long lrc = _lowResourcesConnections * _selectSets; 94 _selectSets=selectSets; 95 _lowResourcesConnections=lrc/_selectSets; 96 } 97 98 /* ------------------------------------------------------------ */ 99 /** 100 * @return the max idle time 101 */ getMaxIdleTime()102 public long getMaxIdleTime() 103 { 104 return _maxIdleTime; 105 } 106 107 /* ------------------------------------------------------------ */ 108 /** 109 * @return the number of select sets in use 110 */ getSelectSets()111 public int getSelectSets() 112 { 113 return _selectSets; 114 } 115 116 /* ------------------------------------------------------------ */ 117 /** 118 * @param i 119 * @return The select set 120 */ getSelectSet(int i)121 public SelectSet getSelectSet(int i) 122 { 123 return _selectSet[i]; 124 } 125 126 /* ------------------------------------------------------------ */ 127 /** Register a channel 128 * @param channel 129 * @param att Attached Object 130 */ register(SocketChannel channel, Object att)131 public void register(SocketChannel channel, Object att) 132 { 133 // The ++ increment here is not atomic, but it does not matter. 134 // so long as the value changes sometimes, then connections will 135 // be distributed over the available sets. 136 137 int s=_set++; 138 if (s<0) 139 s=-s; 140 s=s%_selectSets; 141 SelectSet[] sets=_selectSet; 142 if (sets!=null) 143 { 144 SelectSet set=sets[s]; 145 set.addChange(channel,att); 146 set.wakeup(); 147 } 148 } 149 150 151 /* ------------------------------------------------------------ */ 152 /** Register a channel 153 * @param channel 154 */ register(SocketChannel channel)155 public void register(SocketChannel channel) 156 { 157 // The ++ increment here is not atomic, but it does not matter. 158 // so long as the value changes sometimes, then connections will 159 // be distributed over the available sets. 160 161 int s=_set++; 162 if (s<0) 163 s=-s; 164 s=s%_selectSets; 165 SelectSet[] sets=_selectSet; 166 if (sets!=null) 167 { 168 SelectSet set=sets[s]; 169 set.addChange(channel); 170 set.wakeup(); 171 } 172 } 173 174 /* ------------------------------------------------------------ */ 175 /** Register a {@link ServerSocketChannel} 176 * @param acceptChannel 177 */ register(ServerSocketChannel acceptChannel)178 public void register(ServerSocketChannel acceptChannel) 179 { 180 int s=_set++; 181 if (s<0) 182 s=-s; 183 s=s%_selectSets; 184 SelectSet set=_selectSet[s]; 185 set.addChange(acceptChannel); 186 set.wakeup(); 187 } 188 189 /* ------------------------------------------------------------ */ 190 /** 191 * @return delta The value to add to the selector thread priority. 192 */ getSelectorPriorityDelta()193 public int getSelectorPriorityDelta() 194 { 195 return _selectorPriorityDelta; 196 } 197 198 /* ------------------------------------------------------------ */ 199 /** Set the selector thread priorty delta. 200 * @param delta The value to add to the selector thread priority. 201 */ setSelectorPriorityDelta(int delta)202 public void setSelectorPriorityDelta(int delta) 203 { 204 _selectorPriorityDelta=delta; 205 } 206 207 208 /* ------------------------------------------------------------ */ 209 /** 210 * @return the lowResourcesConnections 211 */ getLowResourcesConnections()212 public long getLowResourcesConnections() 213 { 214 return _lowResourcesConnections*_selectSets; 215 } 216 217 /* ------------------------------------------------------------ */ 218 /** 219 * Set the number of connections, which if exceeded places this manager in low resources state. 220 * This is not an exact measure as the connection count is averaged over the select sets. 221 * @param lowResourcesConnections the number of connections 222 * @see #setLowResourcesMaxIdleTime(long) 223 */ setLowResourcesConnections(long lowResourcesConnections)224 public void setLowResourcesConnections(long lowResourcesConnections) 225 { 226 _lowResourcesConnections=(lowResourcesConnections+_selectSets-1)/_selectSets; 227 } 228 229 /* ------------------------------------------------------------ */ 230 /** 231 * @return the lowResourcesMaxIdleTime 232 */ getLowResourcesMaxIdleTime()233 public long getLowResourcesMaxIdleTime() 234 { 235 return _lowResourcesMaxIdleTime; 236 } 237 238 /* ------------------------------------------------------------ */ 239 /** 240 * @param lowResourcesMaxIdleTime the period in ms that a connection is allowed to be idle when this SelectSet has more connections than {@link #getLowResourcesConnections()} 241 * @see #setMaxIdleTime(long) 242 */ setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime)243 public void setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime) 244 { 245 _lowResourcesMaxIdleTime=(int)lowResourcesMaxIdleTime; 246 } 247 248 249 /* ------------------------------------------------------------------------------- */ dispatch(Runnable task)250 public abstract boolean dispatch(Runnable task); 251 252 /* ------------------------------------------------------------ */ 253 /* (non-Javadoc) 254 * @see org.eclipse.component.AbstractLifeCycle#doStart() 255 */ 256 @Override doStart()257 protected void doStart() throws Exception 258 { 259 _selectSet = new SelectSet[_selectSets]; 260 for (int i=0;i<_selectSet.length;i++) 261 _selectSet[i]= new SelectSet(i); 262 263 super.doStart(); 264 265 // start a thread to Select 266 for (int i=0;i<getSelectSets();i++) 267 { 268 final int id=i; 269 boolean selecting=dispatch(new Runnable() 270 { 271 public void run() 272 { 273 String name=Thread.currentThread().getName(); 274 int priority=Thread.currentThread().getPriority(); 275 try 276 { 277 SelectSet[] sets=_selectSet; 278 if (sets==null) 279 return; 280 SelectSet set=sets[id]; 281 282 Thread.currentThread().setName(name+" Selector"+id); 283 if (getSelectorPriorityDelta()!=0) 284 Thread.currentThread().setPriority(Thread.currentThread().getPriority()+getSelectorPriorityDelta()); 285 LOG.debug("Starting {} on {}",Thread.currentThread(),this); 286 while (isRunning()) 287 { 288 try 289 { 290 set.doSelect(); 291 } 292 catch(IOException e) 293 { 294 LOG.ignore(e); 295 } 296 catch(Exception e) 297 { 298 LOG.warn(e); 299 } 300 } 301 } 302 finally 303 { 304 LOG.debug("Stopped {} on {}",Thread.currentThread(),this); 305 Thread.currentThread().setName(name); 306 if (getSelectorPriorityDelta()!=0) 307 Thread.currentThread().setPriority(priority); 308 } 309 } 310 311 }); 312 313 if (!selecting) 314 throw new IllegalStateException("!Selecting"); 315 } 316 } 317 318 319 /* ------------------------------------------------------------------------------- */ 320 @Override doStop()321 protected void doStop() throws Exception 322 { 323 SelectSet[] sets= _selectSet; 324 _selectSet=null; 325 if (sets!=null) 326 { 327 for (SelectSet set : sets) 328 { 329 if (set!=null) 330 set.stop(); 331 } 332 } 333 super.doStop(); 334 } 335 336 /* ------------------------------------------------------------ */ 337 /** 338 * @param endpoint 339 */ endPointClosed(SelectChannelEndPoint endpoint)340 protected abstract void endPointClosed(SelectChannelEndPoint endpoint); 341 342 /* ------------------------------------------------------------ */ 343 /** 344 * @param endpoint 345 */ endPointOpened(SelectChannelEndPoint endpoint)346 protected abstract void endPointOpened(SelectChannelEndPoint endpoint); 347 348 /* ------------------------------------------------------------ */ endPointUpgraded(ConnectedEndPoint endpoint,Connection oldConnection)349 protected abstract void endPointUpgraded(ConnectedEndPoint endpoint,Connection oldConnection); 350 351 /* ------------------------------------------------------------------------------- */ newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment)352 public abstract AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment); 353 354 /* ------------------------------------------------------------ */ 355 /** 356 * Create a new end point 357 * @param channel 358 * @param selectSet 359 * @param sKey the selection key 360 * @return the new endpoint {@link SelectChannelEndPoint} 361 * @throws IOException 362 */ newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey)363 protected abstract SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException; 364 365 /* ------------------------------------------------------------------------------- */ connectionFailed(SocketChannel channel,Throwable ex,Object attachment)366 protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment) 367 { 368 LOG.warn(ex+","+channel+","+attachment); 369 LOG.debug(ex); 370 } 371 372 /* ------------------------------------------------------------ */ dump()373 public String dump() 374 { 375 return AggregateLifeCycle.dump(this); 376 } 377 378 /* ------------------------------------------------------------ */ dump(Appendable out, String indent)379 public void dump(Appendable out, String indent) throws IOException 380 { 381 AggregateLifeCycle.dumpObject(out,this); 382 AggregateLifeCycle.dump(out,indent,TypeUtil.asList(_selectSet)); 383 } 384 385 386 /* ------------------------------------------------------------------------------- */ 387 /* ------------------------------------------------------------------------------- */ 388 /* ------------------------------------------------------------------------------- */ 389 public class SelectSet implements Dumpable 390 { 391 private final int _setID; 392 private final Timeout _timeout; 393 394 private final ConcurrentLinkedQueue<Object> _changes = new ConcurrentLinkedQueue<Object>(); 395 396 private volatile Selector _selector; 397 398 private volatile Thread _selecting; 399 private int _busySelects; 400 private long _monitorNext; 401 private boolean _pausing; 402 private boolean _paused; 403 private volatile long _idleTick; 404 private ConcurrentMap<SelectChannelEndPoint,Object> _endPoints = new ConcurrentHashMap<SelectChannelEndPoint, Object>(); 405 406 /* ------------------------------------------------------------ */ SelectSet(int acceptorID)407 SelectSet(int acceptorID) throws Exception 408 { 409 _setID=acceptorID; 410 411 _idleTick = System.currentTimeMillis(); 412 _timeout = new Timeout(this); 413 _timeout.setDuration(0L); 414 415 // create a selector; 416 _selector = Selector.open(); 417 _monitorNext=System.currentTimeMillis()+__MONITOR_PERIOD; 418 } 419 420 /* ------------------------------------------------------------ */ addChange(Object change)421 public void addChange(Object change) 422 { 423 _changes.add(change); 424 } 425 426 /* ------------------------------------------------------------ */ addChange(SelectableChannel channel, Object att)427 public void addChange(SelectableChannel channel, Object att) 428 { 429 if (att==null) 430 addChange(channel); 431 else if (att instanceof EndPoint) 432 addChange(att); 433 else 434 addChange(new ChannelAndAttachment(channel,att)); 435 } 436 437 /* ------------------------------------------------------------ */ 438 /** 439 * Select and dispatch tasks found from changes and the selector. 440 * 441 * @throws IOException 442 */ doSelect()443 public void doSelect() throws IOException 444 { 445 try 446 { 447 _selecting=Thread.currentThread(); 448 final Selector selector=_selector; 449 // Stopped concurrently ? 450 if (selector == null) 451 return; 452 453 // Make any key changes required 454 Object change; 455 int changes=_changes.size(); 456 while (changes-->0 && (change=_changes.poll())!=null) 457 { 458 Channel ch=null; 459 SelectionKey key=null; 460 461 try 462 { 463 if (change instanceof EndPoint) 464 { 465 // Update the operations for a key. 466 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)change; 467 ch=endpoint.getChannel(); 468 endpoint.doUpdateKey(); 469 } 470 else if (change instanceof ChannelAndAttachment) 471 { 472 // finish accepting/connecting this connection 473 final ChannelAndAttachment asc = (ChannelAndAttachment)change; 474 final SelectableChannel channel=asc._channel; 475 ch=channel; 476 final Object att = asc._attachment; 477 478 if ((channel instanceof SocketChannel) && ((SocketChannel)channel).isConnected()) 479 { 480 key = channel.register(selector,SelectionKey.OP_READ,att); 481 SelectChannelEndPoint endpoint = createEndPoint((SocketChannel)channel,key); 482 key.attach(endpoint); 483 endpoint.schedule(); 484 } 485 else if (channel.isOpen()) 486 { 487 key = channel.register(selector,SelectionKey.OP_CONNECT,att); 488 } 489 } 490 else if (change instanceof SocketChannel) 491 { 492 // Newly registered channel 493 final SocketChannel channel=(SocketChannel)change; 494 ch=channel; 495 key = channel.register(selector,SelectionKey.OP_READ,null); 496 SelectChannelEndPoint endpoint = createEndPoint(channel,key); 497 key.attach(endpoint); 498 endpoint.schedule(); 499 } 500 else if (change instanceof ChangeTask) 501 { 502 ((Runnable)change).run(); 503 } 504 else if (change instanceof Runnable) 505 { 506 dispatch((Runnable)change); 507 } 508 else 509 throw new IllegalArgumentException(change.toString()); 510 } 511 catch (CancelledKeyException e) 512 { 513 LOG.ignore(e); 514 } 515 catch (Throwable e) 516 { 517 if (isRunning()) 518 LOG.warn(e); 519 else 520 LOG.debug(e); 521 522 try 523 { 524 if (ch!=null) 525 ch.close(); 526 } 527 catch(IOException e2) 528 { 529 LOG.debug(e2); 530 } 531 } 532 } 533 534 535 // Do and instant select to see if any connections can be handled. 536 int selected=selector.selectNow(); 537 538 long now=System.currentTimeMillis(); 539 540 // if no immediate things to do 541 if (selected==0 && selector.selectedKeys().isEmpty()) 542 { 543 // If we are in pausing mode 544 if (_pausing) 545 { 546 try 547 { 548 Thread.sleep(__BUSY_PAUSE); // pause to reduce impact of busy loop 549 } 550 catch(InterruptedException e) 551 { 552 LOG.ignore(e); 553 } 554 now=System.currentTimeMillis(); 555 } 556 557 // workout how long to wait in select 558 _timeout.setNow(now); 559 long to_next_timeout=_timeout.getTimeToNext(); 560 561 long wait = _changes.size()==0?__IDLE_TICK:0L; 562 if (wait > 0 && to_next_timeout >= 0 && wait > to_next_timeout) 563 wait = to_next_timeout; 564 565 // If we should wait with a select 566 if (wait>0) 567 { 568 long before=now; 569 selector.select(wait); 570 now = System.currentTimeMillis(); 571 _timeout.setNow(now); 572 573 // If we are monitoring for busy selector 574 // and this select did not wait more than 1ms 575 if (__MONITOR_PERIOD>0 && now-before <=1) 576 { 577 // count this as a busy select and if there have been too many this monitor cycle 578 if (++_busySelects>__MAX_SELECTS) 579 { 580 // Start injecting pauses 581 _pausing=true; 582 583 // if this is the first pause 584 if (!_paused) 585 { 586 // Log and dump some status 587 _paused=true; 588 LOG.warn("Selector {} is too busy, pausing!",this); 589 } 590 } 591 } 592 } 593 } 594 595 // have we been destroyed while sleeping 596 if (_selector==null || !selector.isOpen()) 597 return; 598 599 // Look for things to do 600 for (SelectionKey key: selector.selectedKeys()) 601 { 602 SocketChannel channel=null; 603 604 try 605 { 606 if (!key.isValid()) 607 { 608 key.cancel(); 609 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment(); 610 if (endpoint != null) 611 endpoint.doUpdateKey(); 612 continue; 613 } 614 615 Object att = key.attachment(); 616 if (att instanceof SelectChannelEndPoint) 617 { 618 if (key.isReadable()||key.isWritable()) 619 ((SelectChannelEndPoint)att).schedule(); 620 } 621 else if (key.isConnectable()) 622 { 623 // Complete a connection of a registered channel 624 channel = (SocketChannel)key.channel(); 625 boolean connected=false; 626 try 627 { 628 connected=channel.finishConnect(); 629 } 630 catch(Exception e) 631 { 632 connectionFailed(channel,e,att); 633 } 634 finally 635 { 636 if (connected) 637 { 638 key.interestOps(SelectionKey.OP_READ); 639 SelectChannelEndPoint endpoint = createEndPoint(channel,key); 640 key.attach(endpoint); 641 endpoint.schedule(); 642 } 643 else 644 { 645 key.cancel(); 646 channel.close(); 647 } 648 } 649 } 650 else 651 { 652 // Wrap readable registered channel in an endpoint 653 channel = (SocketChannel)key.channel(); 654 SelectChannelEndPoint endpoint = createEndPoint(channel,key); 655 key.attach(endpoint); 656 if (key.isReadable()) 657 endpoint.schedule(); 658 } 659 key = null; 660 } 661 catch (CancelledKeyException e) 662 { 663 LOG.ignore(e); 664 } 665 catch (Exception e) 666 { 667 if (isRunning()) 668 LOG.warn(e); 669 else 670 LOG.ignore(e); 671 672 try 673 { 674 if (channel!=null) 675 channel.close(); 676 } 677 catch(IOException e2) 678 { 679 LOG.debug(e2); 680 } 681 682 if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid()) 683 key.cancel(); 684 } 685 } 686 687 // Everything always handled 688 selector.selectedKeys().clear(); 689 690 now=System.currentTimeMillis(); 691 _timeout.setNow(now); 692 Task task = _timeout.expired(); 693 while (task!=null) 694 { 695 if (task instanceof Runnable) 696 dispatch((Runnable)task); 697 task = _timeout.expired(); 698 } 699 700 // Idle tick 701 if (now-_idleTick>__IDLE_TICK) 702 { 703 _idleTick=now; 704 705 final long idle_now=((_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections)) 706 ?(now+_maxIdleTime-_lowResourcesMaxIdleTime) 707 :now; 708 709 dispatch(new Runnable() 710 { 711 public void run() 712 { 713 for (SelectChannelEndPoint endp:_endPoints.keySet()) 714 { 715 endp.checkIdleTimestamp(idle_now); 716 } 717 } 718 public String toString() {return "Idle-"+super.toString();} 719 }); 720 721 } 722 723 // Reset busy select monitor counts 724 if (__MONITOR_PERIOD>0 && now>_monitorNext) 725 { 726 _busySelects=0; 727 _pausing=false; 728 _monitorNext=now+__MONITOR_PERIOD; 729 730 } 731 } 732 catch (ClosedSelectorException e) 733 { 734 if (isRunning()) 735 LOG.warn(e); 736 else 737 LOG.ignore(e); 738 } 739 catch (CancelledKeyException e) 740 { 741 LOG.ignore(e); 742 } 743 finally 744 { 745 _selecting=null; 746 } 747 } 748 749 750 /* ------------------------------------------------------------ */ renewSelector()751 private void renewSelector() 752 { 753 try 754 { 755 synchronized (this) 756 { 757 Selector selector=_selector; 758 if (selector==null) 759 return; 760 final Selector new_selector = Selector.open(); 761 for (SelectionKey k: selector.keys()) 762 { 763 if (!k.isValid() || k.interestOps()==0) 764 continue; 765 766 final SelectableChannel channel = k.channel(); 767 final Object attachment = k.attachment(); 768 769 if (attachment==null) 770 addChange(channel); 771 else 772 addChange(channel,attachment); 773 } 774 _selector.close(); 775 _selector=new_selector; 776 } 777 } 778 catch(IOException e) 779 { 780 throw new RuntimeException("recreating selector",e); 781 } 782 } 783 784 /* ------------------------------------------------------------ */ getManager()785 public SelectorManager getManager() 786 { 787 return SelectorManager.this; 788 } 789 790 /* ------------------------------------------------------------ */ getNow()791 public long getNow() 792 { 793 return _timeout.getNow(); 794 } 795 796 /* ------------------------------------------------------------ */ 797 /** 798 * @param task The task to timeout. If it implements Runnable, then 799 * expired will be called from a dispatched thread. 800 * 801 * @param timeoutMs 802 */ scheduleTimeout(Timeout.Task task, long timeoutMs)803 public void scheduleTimeout(Timeout.Task task, long timeoutMs) 804 { 805 if (!(task instanceof Runnable)) 806 throw new IllegalArgumentException("!Runnable"); 807 _timeout.schedule(task, timeoutMs); 808 } 809 810 /* ------------------------------------------------------------ */ cancelTimeout(Timeout.Task task)811 public void cancelTimeout(Timeout.Task task) 812 { 813 task.cancel(); 814 } 815 816 /* ------------------------------------------------------------ */ wakeup()817 public void wakeup() 818 { 819 try 820 { 821 Selector selector = _selector; 822 if (selector!=null) 823 selector.wakeup(); 824 } 825 catch(Exception e) 826 { 827 addChange(new ChangeTask() 828 { 829 public void run() 830 { 831 renewSelector(); 832 } 833 }); 834 835 renewSelector(); 836 } 837 } 838 839 /* ------------------------------------------------------------ */ createEndPoint(SocketChannel channel, SelectionKey sKey)840 private SelectChannelEndPoint createEndPoint(SocketChannel channel, SelectionKey sKey) throws IOException 841 { 842 SelectChannelEndPoint endp = newEndPoint(channel,this,sKey); 843 LOG.debug("created {}",endp); 844 endPointOpened(endp); 845 _endPoints.put(endp,this); 846 return endp; 847 } 848 849 /* ------------------------------------------------------------ */ destroyEndPoint(SelectChannelEndPoint endp)850 public void destroyEndPoint(SelectChannelEndPoint endp) 851 { 852 LOG.debug("destroyEndPoint {}",endp); 853 _endPoints.remove(endp); 854 endPointClosed(endp); 855 } 856 857 /* ------------------------------------------------------------ */ getSelector()858 Selector getSelector() 859 { 860 return _selector; 861 } 862 863 /* ------------------------------------------------------------ */ stop()864 void stop() throws Exception 865 { 866 // Spin for a while waiting for selector to complete 867 // to avoid unneccessary closed channel exceptions 868 try 869 { 870 for (int i=0;i<100 && _selecting!=null;i++) 871 { 872 wakeup(); 873 Thread.sleep(10); 874 } 875 } 876 catch(Exception e) 877 { 878 LOG.ignore(e); 879 } 880 881 // close endpoints and selector 882 synchronized (this) 883 { 884 Selector selector=_selector; 885 for (SelectionKey key:selector.keys()) 886 { 887 if (key==null) 888 continue; 889 Object att=key.attachment(); 890 if (att instanceof EndPoint) 891 { 892 EndPoint endpoint = (EndPoint)att; 893 try 894 { 895 endpoint.close(); 896 } 897 catch(IOException e) 898 { 899 LOG.ignore(e); 900 } 901 } 902 } 903 904 905 _timeout.cancelAll(); 906 try 907 { 908 selector=_selector; 909 if (selector != null) 910 selector.close(); 911 } 912 catch (IOException e) 913 { 914 LOG.ignore(e); 915 } 916 _selector=null; 917 } 918 } 919 920 /* ------------------------------------------------------------ */ dump()921 public String dump() 922 { 923 return AggregateLifeCycle.dump(this); 924 } 925 926 /* ------------------------------------------------------------ */ dump(Appendable out, String indent)927 public void dump(Appendable out, String indent) throws IOException 928 { 929 out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_setID)).append("\n"); 930 931 Thread selecting = _selecting; 932 933 Object where = "not selecting"; 934 StackTraceElement[] trace =selecting==null?null:selecting.getStackTrace(); 935 if (trace!=null) 936 { 937 for (StackTraceElement t:trace) 938 if (t.getClassName().startsWith("org.eclipse.jetty.")) 939 { 940 where=t; 941 break; 942 } 943 } 944 945 Selector selector=_selector; 946 if (selector!=null) 947 { 948 final ArrayList<Object> dump = new ArrayList<Object>(selector.keys().size()*2); 949 dump.add(where); 950 951 final CountDownLatch latch = new CountDownLatch(1); 952 953 addChange(new ChangeTask() 954 { 955 public void run() 956 { 957 dumpKeyState(dump); 958 latch.countDown(); 959 } 960 }); 961 962 try 963 { 964 latch.await(5,TimeUnit.SECONDS); 965 } 966 catch(InterruptedException e) 967 { 968 LOG.ignore(e); 969 } 970 971 AggregateLifeCycle.dump(out,indent,dump); 972 } 973 } 974 975 /* ------------------------------------------------------------ */ dumpKeyState(List<Object> dumpto)976 public void dumpKeyState(List<Object> dumpto) 977 { 978 Selector selector=_selector; 979 Set<SelectionKey> keys = selector.keys(); 980 dumpto.add(selector + " keys=" + keys.size()); 981 for (SelectionKey key: keys) 982 { 983 if (key.isValid()) 984 dumpto.add(key.attachment()+" iOps="+key.interestOps()+" rOps="+key.readyOps()); 985 else 986 dumpto.add(key.attachment()+" iOps=-1 rOps=-1"); 987 } 988 } 989 990 /* ------------------------------------------------------------ */ toString()991 public String toString() 992 { 993 Selector selector=_selector; 994 return String.format("%s keys=%d selected=%d", 995 super.toString(), 996 selector != null && selector.isOpen() ? selector.keys().size() : -1, 997 selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1); 998 } 999 } 1000 1001 /* ------------------------------------------------------------ */ 1002 private static class ChannelAndAttachment 1003 { 1004 final SelectableChannel _channel; 1005 final Object _attachment; 1006 ChannelAndAttachment(SelectableChannel channel, Object attachment)1007 public ChannelAndAttachment(SelectableChannel channel, Object attachment) 1008 { 1009 super(); 1010 _channel = channel; 1011 _attachment = attachment; 1012 } 1013 } 1014 1015 /* ------------------------------------------------------------ */ isDeferringInterestedOps0()1016 public boolean isDeferringInterestedOps0() 1017 { 1018 return _deferringInterestedOps0; 1019 } 1020 1021 /* ------------------------------------------------------------ */ setDeferringInterestedOps0(boolean deferringInterestedOps0)1022 public void setDeferringInterestedOps0(boolean deferringInterestedOps0) 1023 { 1024 _deferringInterestedOps0 = deferringInterestedOps0; 1025 } 1026 1027 1028 /* ------------------------------------------------------------ */ 1029 /* ------------------------------------------------------------ */ 1030 /* ------------------------------------------------------------ */ 1031 private interface ChangeTask extends Runnable 1032 {} 1033 1034 } 1035