• Home
  • History
  • Annotate
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //  ========================================================================
3 //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4 //  ------------------------------------------------------------------------
5 //  All rights reserved. This program and the accompanying materials
6 //  are made available under the terms of the Eclipse Public License v1.0
7 //  and Apache License v2.0 which accompanies this distribution.
8 //
9 //      The Eclipse Public License is available at
10 //      http://www.eclipse.org/legal/epl-v10.html
11 //
12 //      The Apache License v2.0 is available at
13 //      http://www.opensource.org/licenses/apache2.0.php
14 //
15 //  You may elect to redistribute this code under either of these licenses.
16 //  ========================================================================
17 //
18 
19 package org.eclipse.jetty.io.nio;
20 
21 import java.io.IOException;
22 import java.nio.channels.CancelledKeyException;
23 import java.nio.channels.Channel;
24 import java.nio.channels.ClosedSelectorException;
25 import java.nio.channels.SelectableChannel;
26 import java.nio.channels.SelectionKey;
27 import java.nio.channels.Selector;
28 import java.nio.channels.ServerSocketChannel;
29 import java.nio.channels.SocketChannel;
30 import java.util.ArrayList;
31 import java.util.List;
32 import java.util.Set;
33 import java.util.concurrent.ConcurrentHashMap;
34 import java.util.concurrent.ConcurrentLinkedQueue;
35 import java.util.concurrent.ConcurrentMap;
36 import java.util.concurrent.CountDownLatch;
37 import java.util.concurrent.TimeUnit;
38 
39 import org.eclipse.jetty.io.AsyncEndPoint;
40 import org.eclipse.jetty.io.ConnectedEndPoint;
41 import org.eclipse.jetty.io.Connection;
42 import org.eclipse.jetty.io.EndPoint;
43 import org.eclipse.jetty.util.TypeUtil;
44 import org.eclipse.jetty.util.component.AbstractLifeCycle;
45 import org.eclipse.jetty.util.component.AggregateLifeCycle;
46 import org.eclipse.jetty.util.component.Dumpable;
47 import org.eclipse.jetty.util.log.Log;
48 import org.eclipse.jetty.util.log.Logger;
49 import org.eclipse.jetty.util.thread.Timeout;
50 import org.eclipse.jetty.util.thread.Timeout.Task;
51 
52 
53 /* ------------------------------------------------------------ */
54 /**
55  * The Selector Manager manages and number of SelectSets to allow
56  * NIO scheduling to scale to large numbers of connections.
57  * <p>
58  */
59 public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable
60 {
61     public static final Logger LOG=Log.getLogger("org.eclipse.jetty.io.nio");
62 
63     private static final int __MONITOR_PERIOD=Integer.getInteger("org.eclipse.jetty.io.nio.MONITOR_PERIOD",1000).intValue();
64     private static final int __MAX_SELECTS=Integer.getInteger("org.eclipse.jetty.io.nio.MAX_SELECTS",100000).intValue();
65     private static final int __BUSY_PAUSE=Integer.getInteger("org.eclipse.jetty.io.nio.BUSY_PAUSE",50).intValue();
66     private static final int __IDLE_TICK=Integer.getInteger("org.eclipse.jetty.io.nio.IDLE_TICK",400).intValue();
67 
68     private int _maxIdleTime;
69     private int _lowResourcesMaxIdleTime;
70     private long _lowResourcesConnections;
71     private SelectSet[] _selectSet;
72     private int _selectSets=1;
73     private volatile int _set=0;
74     private boolean _deferringInterestedOps0=true;
75     private int _selectorPriorityDelta=0;
76 
77     /* ------------------------------------------------------------ */
78     /**
79      * @param maxIdleTime The maximum period in milli seconds that a connection may be idle before it is closed.
80      * @see #setLowResourcesMaxIdleTime(long)
81      */
setMaxIdleTime(long maxIdleTime)82     public void setMaxIdleTime(long maxIdleTime)
83     {
84         _maxIdleTime=(int)maxIdleTime;
85     }
86 
87     /* ------------------------------------------------------------ */
88     /**
89      * @param selectSets number of select sets to create
90      */
setSelectSets(int selectSets)91     public void setSelectSets(int selectSets)
92     {
93         long lrc = _lowResourcesConnections * _selectSets;
94         _selectSets=selectSets;
95         _lowResourcesConnections=lrc/_selectSets;
96     }
97 
98     /* ------------------------------------------------------------ */
99     /**
100      * @return the max idle time
101      */
getMaxIdleTime()102     public long getMaxIdleTime()
103     {
104         return _maxIdleTime;
105     }
106 
107     /* ------------------------------------------------------------ */
108     /**
109      * @return the number of select sets in use
110      */
getSelectSets()111     public int getSelectSets()
112     {
113         return _selectSets;
114     }
115 
116     /* ------------------------------------------------------------ */
117     /**
118      * @param i
119      * @return The select set
120      */
getSelectSet(int i)121     public SelectSet getSelectSet(int i)
122     {
123         return _selectSet[i];
124     }
125 
126     /* ------------------------------------------------------------ */
127     /** Register a channel
128      * @param channel
129      * @param att Attached Object
130      */
register(SocketChannel channel, Object att)131     public void register(SocketChannel channel, Object att)
132     {
133         // The ++ increment here is not atomic, but it does not matter.
134         // so long as the value changes sometimes, then connections will
135         // be distributed over the available sets.
136 
137         int s=_set++;
138         if (s<0)
139             s=-s;
140         s=s%_selectSets;
141         SelectSet[] sets=_selectSet;
142         if (sets!=null)
143         {
144             SelectSet set=sets[s];
145             set.addChange(channel,att);
146             set.wakeup();
147         }
148     }
149 
150 
151     /* ------------------------------------------------------------ */
152     /** Register a channel
153      * @param channel
154      */
register(SocketChannel channel)155     public void register(SocketChannel channel)
156     {
157         // The ++ increment here is not atomic, but it does not matter.
158         // so long as the value changes sometimes, then connections will
159         // be distributed over the available sets.
160 
161         int s=_set++;
162         if (s<0)
163             s=-s;
164         s=s%_selectSets;
165         SelectSet[] sets=_selectSet;
166         if (sets!=null)
167         {
168             SelectSet set=sets[s];
169             set.addChange(channel);
170             set.wakeup();
171         }
172     }
173 
174     /* ------------------------------------------------------------ */
175     /** Register a {@link ServerSocketChannel}
176      * @param acceptChannel
177      */
register(ServerSocketChannel acceptChannel)178     public void register(ServerSocketChannel acceptChannel)
179     {
180         int s=_set++;
181         if (s<0)
182             s=-s;
183         s=s%_selectSets;
184         SelectSet set=_selectSet[s];
185         set.addChange(acceptChannel);
186         set.wakeup();
187     }
188 
189     /* ------------------------------------------------------------ */
190     /**
191      * @return delta The value to add to the selector thread priority.
192      */
getSelectorPriorityDelta()193     public int getSelectorPriorityDelta()
194     {
195         return _selectorPriorityDelta;
196     }
197 
198     /* ------------------------------------------------------------ */
199     /** Set the selector thread priorty delta.
200      * @param delta The value to add to the selector thread priority.
201      */
setSelectorPriorityDelta(int delta)202     public void setSelectorPriorityDelta(int delta)
203     {
204         _selectorPriorityDelta=delta;
205     }
206 
207 
208     /* ------------------------------------------------------------ */
209     /**
210      * @return the lowResourcesConnections
211      */
getLowResourcesConnections()212     public long getLowResourcesConnections()
213     {
214         return _lowResourcesConnections*_selectSets;
215     }
216 
217     /* ------------------------------------------------------------ */
218     /**
219      * Set the number of connections, which if exceeded places this manager in low resources state.
220      * This is not an exact measure as the connection count is averaged over the select sets.
221      * @param lowResourcesConnections the number of connections
222      * @see #setLowResourcesMaxIdleTime(long)
223      */
setLowResourcesConnections(long lowResourcesConnections)224     public void setLowResourcesConnections(long lowResourcesConnections)
225     {
226         _lowResourcesConnections=(lowResourcesConnections+_selectSets-1)/_selectSets;
227     }
228 
229     /* ------------------------------------------------------------ */
230     /**
231      * @return the lowResourcesMaxIdleTime
232      */
getLowResourcesMaxIdleTime()233     public long getLowResourcesMaxIdleTime()
234     {
235         return _lowResourcesMaxIdleTime;
236     }
237 
238     /* ------------------------------------------------------------ */
239     /**
240      * @param lowResourcesMaxIdleTime the period in ms that a connection is allowed to be idle when this SelectSet has more connections than {@link #getLowResourcesConnections()}
241      * @see #setMaxIdleTime(long)
242      */
setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime)243     public void setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime)
244     {
245         _lowResourcesMaxIdleTime=(int)lowResourcesMaxIdleTime;
246     }
247 
248 
249     /* ------------------------------------------------------------------------------- */
dispatch(Runnable task)250     public abstract boolean dispatch(Runnable task);
251 
252     /* ------------------------------------------------------------ */
253     /* (non-Javadoc)
254      * @see org.eclipse.component.AbstractLifeCycle#doStart()
255      */
256     @Override
doStart()257     protected void doStart() throws Exception
258     {
259         _selectSet = new SelectSet[_selectSets];
260         for (int i=0;i<_selectSet.length;i++)
261             _selectSet[i]= new SelectSet(i);
262 
263         super.doStart();
264 
265         // start a thread to Select
266         for (int i=0;i<getSelectSets();i++)
267         {
268             final int id=i;
269             boolean selecting=dispatch(new Runnable()
270             {
271                 public void run()
272                 {
273                     String name=Thread.currentThread().getName();
274                     int priority=Thread.currentThread().getPriority();
275                     try
276                     {
277                         SelectSet[] sets=_selectSet;
278                         if (sets==null)
279                             return;
280                         SelectSet set=sets[id];
281 
282                         Thread.currentThread().setName(name+" Selector"+id);
283                         if (getSelectorPriorityDelta()!=0)
284                             Thread.currentThread().setPriority(Thread.currentThread().getPriority()+getSelectorPriorityDelta());
285                         LOG.debug("Starting {} on {}",Thread.currentThread(),this);
286                         while (isRunning())
287                         {
288                             try
289                             {
290                                 set.doSelect();
291                             }
292                             catch(IOException e)
293                             {
294                                 LOG.ignore(e);
295                             }
296                             catch(Exception e)
297                             {
298                                 LOG.warn(e);
299                             }
300                         }
301                     }
302                     finally
303                     {
304                         LOG.debug("Stopped {} on {}",Thread.currentThread(),this);
305                         Thread.currentThread().setName(name);
306                         if (getSelectorPriorityDelta()!=0)
307                             Thread.currentThread().setPriority(priority);
308                     }
309                 }
310 
311             });
312 
313             if (!selecting)
314                 throw new IllegalStateException("!Selecting");
315         }
316     }
317 
318 
319     /* ------------------------------------------------------------------------------- */
320     @Override
doStop()321     protected void doStop() throws Exception
322     {
323         SelectSet[] sets= _selectSet;
324         _selectSet=null;
325         if (sets!=null)
326         {
327             for (SelectSet set : sets)
328             {
329                 if (set!=null)
330                     set.stop();
331             }
332         }
333         super.doStop();
334     }
335 
336     /* ------------------------------------------------------------ */
337     /**
338      * @param endpoint
339      */
endPointClosed(SelectChannelEndPoint endpoint)340     protected abstract void endPointClosed(SelectChannelEndPoint endpoint);
341 
342     /* ------------------------------------------------------------ */
343     /**
344      * @param endpoint
345      */
endPointOpened(SelectChannelEndPoint endpoint)346     protected abstract void endPointOpened(SelectChannelEndPoint endpoint);
347 
348     /* ------------------------------------------------------------ */
endPointUpgraded(ConnectedEndPoint endpoint,Connection oldConnection)349     protected abstract void endPointUpgraded(ConnectedEndPoint endpoint,Connection oldConnection);
350 
351     /* ------------------------------------------------------------------------------- */
newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment)352     public abstract AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment);
353 
354     /* ------------------------------------------------------------ */
355     /**
356      * Create a new end point
357      * @param channel
358      * @param selectSet
359      * @param sKey the selection key
360      * @return the new endpoint {@link SelectChannelEndPoint}
361      * @throws IOException
362      */
newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey)363     protected abstract SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException;
364 
365     /* ------------------------------------------------------------------------------- */
connectionFailed(SocketChannel channel,Throwable ex,Object attachment)366     protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment)
367     {
368         LOG.warn(ex+","+channel+","+attachment);
369         LOG.debug(ex);
370     }
371 
372     /* ------------------------------------------------------------ */
dump()373     public String dump()
374     {
375         return AggregateLifeCycle.dump(this);
376     }
377 
378     /* ------------------------------------------------------------ */
dump(Appendable out, String indent)379     public void dump(Appendable out, String indent) throws IOException
380     {
381         AggregateLifeCycle.dumpObject(out,this);
382         AggregateLifeCycle.dump(out,indent,TypeUtil.asList(_selectSet));
383     }
384 
385 
386     /* ------------------------------------------------------------------------------- */
387     /* ------------------------------------------------------------------------------- */
388     /* ------------------------------------------------------------------------------- */
389     public class SelectSet implements Dumpable
390     {
391         private final int _setID;
392         private final Timeout _timeout;
393 
394         private final ConcurrentLinkedQueue<Object> _changes = new ConcurrentLinkedQueue<Object>();
395 
396         private volatile Selector _selector;
397 
398         private volatile Thread _selecting;
399         private int _busySelects;
400         private long _monitorNext;
401         private boolean _pausing;
402         private boolean _paused;
403         private volatile long _idleTick;
404         private ConcurrentMap<SelectChannelEndPoint,Object> _endPoints = new ConcurrentHashMap<SelectChannelEndPoint, Object>();
405 
406         /* ------------------------------------------------------------ */
SelectSet(int acceptorID)407         SelectSet(int acceptorID) throws Exception
408         {
409             _setID=acceptorID;
410 
411             _idleTick = System.currentTimeMillis();
412             _timeout = new Timeout(this);
413             _timeout.setDuration(0L);
414 
415             // create a selector;
416             _selector = Selector.open();
417             _monitorNext=System.currentTimeMillis()+__MONITOR_PERIOD;
418         }
419 
420         /* ------------------------------------------------------------ */
addChange(Object change)421         public void addChange(Object change)
422         {
423             _changes.add(change);
424         }
425 
426         /* ------------------------------------------------------------ */
addChange(SelectableChannel channel, Object att)427         public void addChange(SelectableChannel channel, Object att)
428         {
429             if (att==null)
430                 addChange(channel);
431             else if (att instanceof EndPoint)
432                 addChange(att);
433             else
434                 addChange(new ChannelAndAttachment(channel,att));
435         }
436 
437         /* ------------------------------------------------------------ */
438         /**
439          * Select and dispatch tasks found from changes and the selector.
440          *
441          * @throws IOException
442          */
doSelect()443         public void doSelect() throws IOException
444         {
445             try
446             {
447                 _selecting=Thread.currentThread();
448                 final Selector selector=_selector;
449                 // Stopped concurrently ?
450                 if (selector == null)
451                     return;
452 
453                 // Make any key changes required
454                 Object change;
455                 int changes=_changes.size();
456                 while (changes-->0 && (change=_changes.poll())!=null)
457                 {
458                     Channel ch=null;
459                     SelectionKey key=null;
460 
461                     try
462                     {
463                         if (change instanceof EndPoint)
464                         {
465                             // Update the operations for a key.
466                             SelectChannelEndPoint endpoint = (SelectChannelEndPoint)change;
467                             ch=endpoint.getChannel();
468                             endpoint.doUpdateKey();
469                         }
470                         else if (change instanceof ChannelAndAttachment)
471                         {
472                             // finish accepting/connecting this connection
473                             final ChannelAndAttachment asc = (ChannelAndAttachment)change;
474                             final SelectableChannel channel=asc._channel;
475                             ch=channel;
476                             final Object att = asc._attachment;
477 
478                             if ((channel instanceof SocketChannel) && ((SocketChannel)channel).isConnected())
479                             {
480                                 key = channel.register(selector,SelectionKey.OP_READ,att);
481                                 SelectChannelEndPoint endpoint = createEndPoint((SocketChannel)channel,key);
482                                 key.attach(endpoint);
483                                 endpoint.schedule();
484                             }
485                             else if (channel.isOpen())
486                             {
487                                 key = channel.register(selector,SelectionKey.OP_CONNECT,att);
488                             }
489                         }
490                         else if (change instanceof SocketChannel)
491                         {
492                             // Newly registered channel
493                             final SocketChannel channel=(SocketChannel)change;
494                             ch=channel;
495                             key = channel.register(selector,SelectionKey.OP_READ,null);
496                             SelectChannelEndPoint endpoint = createEndPoint(channel,key);
497                             key.attach(endpoint);
498                             endpoint.schedule();
499                         }
500                         else if (change instanceof ChangeTask)
501                         {
502                             ((Runnable)change).run();
503                         }
504                         else if (change instanceof Runnable)
505                         {
506                             dispatch((Runnable)change);
507                         }
508                         else
509                             throw new IllegalArgumentException(change.toString());
510                     }
511                     catch (CancelledKeyException e)
512                     {
513                         LOG.ignore(e);
514                     }
515                     catch (Throwable e)
516                     {
517                         if (isRunning())
518                             LOG.warn(e);
519                         else
520                             LOG.debug(e);
521 
522                         try
523                         {
524                             if (ch!=null)
525                                 ch.close();
526                         }
527                         catch(IOException e2)
528                         {
529                             LOG.debug(e2);
530                         }
531                     }
532                 }
533 
534 
535                 // Do and instant select to see if any connections can be handled.
536                 int selected=selector.selectNow();
537 
538                 long now=System.currentTimeMillis();
539 
540                 // if no immediate things to do
541                 if (selected==0 && selector.selectedKeys().isEmpty())
542                 {
543                     // If we are in pausing mode
544                     if (_pausing)
545                     {
546                         try
547                         {
548                             Thread.sleep(__BUSY_PAUSE); // pause to reduce impact of  busy loop
549                         }
550                         catch(InterruptedException e)
551                         {
552                             LOG.ignore(e);
553                         }
554                         now=System.currentTimeMillis();
555                     }
556 
557                     // workout how long to wait in select
558                     _timeout.setNow(now);
559                     long to_next_timeout=_timeout.getTimeToNext();
560 
561                     long wait = _changes.size()==0?__IDLE_TICK:0L;
562                     if (wait > 0 && to_next_timeout >= 0 && wait > to_next_timeout)
563                         wait = to_next_timeout;
564 
565                     // If we should wait with a select
566                     if (wait>0)
567                     {
568                         long before=now;
569                         selector.select(wait);
570                         now = System.currentTimeMillis();
571                         _timeout.setNow(now);
572 
573                         // If we are monitoring for busy selector
574                         // and this select did not wait more than 1ms
575                         if (__MONITOR_PERIOD>0 && now-before <=1)
576                         {
577                             // count this as a busy select and if there have been too many this monitor cycle
578                             if (++_busySelects>__MAX_SELECTS)
579                             {
580                                 // Start injecting pauses
581                                 _pausing=true;
582 
583                                 // if this is the first pause
584                                 if (!_paused)
585                                 {
586                                     // Log and dump some status
587                                     _paused=true;
588                                     LOG.warn("Selector {} is too busy, pausing!",this);
589                                 }
590                             }
591                         }
592                     }
593                 }
594 
595                 // have we been destroyed while sleeping
596                 if (_selector==null || !selector.isOpen())
597                     return;
598 
599                 // Look for things to do
600                 for (SelectionKey key: selector.selectedKeys())
601                 {
602                     SocketChannel channel=null;
603 
604                     try
605                     {
606                         if (!key.isValid())
607                         {
608                             key.cancel();
609                             SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment();
610                             if (endpoint != null)
611                                 endpoint.doUpdateKey();
612                             continue;
613                         }
614 
615                         Object att = key.attachment();
616                         if (att instanceof SelectChannelEndPoint)
617                         {
618                             if (key.isReadable()||key.isWritable())
619                                 ((SelectChannelEndPoint)att).schedule();
620                         }
621                         else if (key.isConnectable())
622                         {
623                             // Complete a connection of a registered channel
624                             channel = (SocketChannel)key.channel();
625                             boolean connected=false;
626                             try
627                             {
628                                 connected=channel.finishConnect();
629                             }
630                             catch(Exception e)
631                             {
632                                 connectionFailed(channel,e,att);
633                             }
634                             finally
635                             {
636                                 if (connected)
637                                 {
638                                     key.interestOps(SelectionKey.OP_READ);
639                                     SelectChannelEndPoint endpoint = createEndPoint(channel,key);
640                                     key.attach(endpoint);
641                                     endpoint.schedule();
642                                 }
643                                 else
644                                 {
645                                     key.cancel();
646                                     channel.close();
647                                 }
648                             }
649                         }
650                         else
651                         {
652                             // Wrap readable registered channel in an endpoint
653                             channel = (SocketChannel)key.channel();
654                             SelectChannelEndPoint endpoint = createEndPoint(channel,key);
655                             key.attach(endpoint);
656                             if (key.isReadable())
657                                 endpoint.schedule();
658                         }
659                         key = null;
660                     }
661                     catch (CancelledKeyException e)
662                     {
663                         LOG.ignore(e);
664                     }
665                     catch (Exception e)
666                     {
667                         if (isRunning())
668                             LOG.warn(e);
669                         else
670                             LOG.ignore(e);
671 
672                         try
673                         {
674                             if (channel!=null)
675                                 channel.close();
676                         }
677                         catch(IOException e2)
678                         {
679                             LOG.debug(e2);
680                         }
681 
682                         if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid())
683                             key.cancel();
684                     }
685                 }
686 
687                 // Everything always handled
688                 selector.selectedKeys().clear();
689 
690                 now=System.currentTimeMillis();
691                 _timeout.setNow(now);
692                 Task task = _timeout.expired();
693                 while (task!=null)
694                 {
695                     if (task instanceof Runnable)
696                         dispatch((Runnable)task);
697                     task = _timeout.expired();
698                 }
699 
700                 // Idle tick
701                 if (now-_idleTick>__IDLE_TICK)
702                 {
703                     _idleTick=now;
704 
705                     final long idle_now=((_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections))
706                         ?(now+_maxIdleTime-_lowResourcesMaxIdleTime)
707                         :now;
708 
709                     dispatch(new Runnable()
710                     {
711                         public void run()
712                         {
713                             for (SelectChannelEndPoint endp:_endPoints.keySet())
714                             {
715                                 endp.checkIdleTimestamp(idle_now);
716                             }
717                         }
718                         public String toString() {return "Idle-"+super.toString();}
719                     });
720 
721                 }
722 
723                 // Reset busy select monitor counts
724                 if (__MONITOR_PERIOD>0 && now>_monitorNext)
725                 {
726                     _busySelects=0;
727                     _pausing=false;
728                     _monitorNext=now+__MONITOR_PERIOD;
729 
730                 }
731             }
732             catch (ClosedSelectorException e)
733             {
734                 if (isRunning())
735                     LOG.warn(e);
736                 else
737                     LOG.ignore(e);
738             }
739             catch (CancelledKeyException e)
740             {
741                 LOG.ignore(e);
742             }
743             finally
744             {
745                 _selecting=null;
746             }
747         }
748 
749 
750         /* ------------------------------------------------------------ */
renewSelector()751         private void renewSelector()
752         {
753             try
754             {
755                 synchronized (this)
756                 {
757                     Selector selector=_selector;
758                     if (selector==null)
759                         return;
760                     final Selector new_selector = Selector.open();
761                     for (SelectionKey k: selector.keys())
762                     {
763                         if (!k.isValid() || k.interestOps()==0)
764                             continue;
765 
766                         final SelectableChannel channel = k.channel();
767                         final Object attachment = k.attachment();
768 
769                         if (attachment==null)
770                             addChange(channel);
771                         else
772                             addChange(channel,attachment);
773                     }
774                     _selector.close();
775                     _selector=new_selector;
776                 }
777             }
778             catch(IOException e)
779             {
780                 throw new RuntimeException("recreating selector",e);
781             }
782         }
783 
784         /* ------------------------------------------------------------ */
getManager()785         public SelectorManager getManager()
786         {
787             return SelectorManager.this;
788         }
789 
790         /* ------------------------------------------------------------ */
getNow()791         public long getNow()
792         {
793             return _timeout.getNow();
794         }
795 
796         /* ------------------------------------------------------------ */
797         /**
798          * @param task The task to timeout. If it implements Runnable, then
799          * expired will be called from a dispatched thread.
800          *
801          * @param timeoutMs
802          */
scheduleTimeout(Timeout.Task task, long timeoutMs)803         public void scheduleTimeout(Timeout.Task task, long timeoutMs)
804         {
805             if (!(task instanceof Runnable))
806                 throw new IllegalArgumentException("!Runnable");
807             _timeout.schedule(task, timeoutMs);
808         }
809 
810         /* ------------------------------------------------------------ */
cancelTimeout(Timeout.Task task)811         public void cancelTimeout(Timeout.Task task)
812         {
813             task.cancel();
814         }
815 
816         /* ------------------------------------------------------------ */
wakeup()817         public void wakeup()
818         {
819             try
820             {
821                 Selector selector = _selector;
822                 if (selector!=null)
823                     selector.wakeup();
824             }
825             catch(Exception e)
826             {
827                 addChange(new ChangeTask()
828                 {
829                     public void run()
830                     {
831                         renewSelector();
832                     }
833                 });
834 
835                 renewSelector();
836             }
837         }
838 
839         /* ------------------------------------------------------------ */
createEndPoint(SocketChannel channel, SelectionKey sKey)840         private SelectChannelEndPoint createEndPoint(SocketChannel channel, SelectionKey sKey) throws IOException
841         {
842             SelectChannelEndPoint endp = newEndPoint(channel,this,sKey);
843             LOG.debug("created {}",endp);
844             endPointOpened(endp);
845             _endPoints.put(endp,this);
846             return endp;
847         }
848 
849         /* ------------------------------------------------------------ */
destroyEndPoint(SelectChannelEndPoint endp)850         public void destroyEndPoint(SelectChannelEndPoint endp)
851         {
852             LOG.debug("destroyEndPoint {}",endp);
853             _endPoints.remove(endp);
854             endPointClosed(endp);
855         }
856 
857         /* ------------------------------------------------------------ */
getSelector()858         Selector getSelector()
859         {
860             return _selector;
861         }
862 
863         /* ------------------------------------------------------------ */
stop()864         void stop() throws Exception
865         {
866             // Spin for a while waiting for selector to complete
867             // to avoid unneccessary closed channel exceptions
868             try
869             {
870                 for (int i=0;i<100 && _selecting!=null;i++)
871                 {
872                     wakeup();
873                     Thread.sleep(10);
874                 }
875             }
876             catch(Exception e)
877             {
878                 LOG.ignore(e);
879             }
880 
881             // close endpoints and selector
882             synchronized (this)
883             {
884                 Selector selector=_selector;
885                 for (SelectionKey key:selector.keys())
886                 {
887                     if (key==null)
888                         continue;
889                     Object att=key.attachment();
890                     if (att instanceof EndPoint)
891                     {
892                         EndPoint endpoint = (EndPoint)att;
893                         try
894                         {
895                             endpoint.close();
896                         }
897                         catch(IOException e)
898                         {
899                             LOG.ignore(e);
900                         }
901                     }
902                 }
903 
904 
905                 _timeout.cancelAll();
906                 try
907                 {
908                     selector=_selector;
909                     if (selector != null)
910                         selector.close();
911                 }
912                 catch (IOException e)
913                 {
914                     LOG.ignore(e);
915                 }
916                 _selector=null;
917             }
918         }
919 
920         /* ------------------------------------------------------------ */
dump()921         public String dump()
922         {
923             return AggregateLifeCycle.dump(this);
924         }
925 
926         /* ------------------------------------------------------------ */
dump(Appendable out, String indent)927         public void dump(Appendable out, String indent) throws IOException
928         {
929             out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_setID)).append("\n");
930 
931             Thread selecting = _selecting;
932 
933             Object where = "not selecting";
934             StackTraceElement[] trace =selecting==null?null:selecting.getStackTrace();
935             if (trace!=null)
936             {
937                 for (StackTraceElement t:trace)
938                     if (t.getClassName().startsWith("org.eclipse.jetty."))
939                     {
940                         where=t;
941                         break;
942                     }
943             }
944 
945             Selector selector=_selector;
946             if (selector!=null)
947             {
948                 final ArrayList<Object> dump = new ArrayList<Object>(selector.keys().size()*2);
949                 dump.add(where);
950 
951                 final CountDownLatch latch = new CountDownLatch(1);
952 
953                 addChange(new ChangeTask()
954                 {
955                     public void run()
956                     {
957                         dumpKeyState(dump);
958                         latch.countDown();
959                     }
960                 });
961 
962                 try
963                 {
964                     latch.await(5,TimeUnit.SECONDS);
965                 }
966                 catch(InterruptedException e)
967                 {
968                     LOG.ignore(e);
969                 }
970 
971                 AggregateLifeCycle.dump(out,indent,dump);
972             }
973         }
974 
975         /* ------------------------------------------------------------ */
dumpKeyState(List<Object> dumpto)976         public void dumpKeyState(List<Object> dumpto)
977         {
978             Selector selector=_selector;
979             Set<SelectionKey> keys = selector.keys();
980             dumpto.add(selector + " keys=" + keys.size());
981             for (SelectionKey key: keys)
982             {
983                 if (key.isValid())
984                     dumpto.add(key.attachment()+" iOps="+key.interestOps()+" rOps="+key.readyOps());
985                 else
986                     dumpto.add(key.attachment()+" iOps=-1 rOps=-1");
987             }
988         }
989 
990         /* ------------------------------------------------------------ */
toString()991         public String toString()
992         {
993             Selector selector=_selector;
994             return String.format("%s keys=%d selected=%d",
995                     super.toString(),
996                     selector != null && selector.isOpen() ? selector.keys().size() : -1,
997                     selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1);
998         }
999     }
1000 
1001     /* ------------------------------------------------------------ */
1002     private static class ChannelAndAttachment
1003     {
1004         final SelectableChannel _channel;
1005         final Object _attachment;
1006 
ChannelAndAttachment(SelectableChannel channel, Object attachment)1007         public ChannelAndAttachment(SelectableChannel channel, Object attachment)
1008         {
1009             super();
1010             _channel = channel;
1011             _attachment = attachment;
1012         }
1013     }
1014 
1015     /* ------------------------------------------------------------ */
isDeferringInterestedOps0()1016     public boolean isDeferringInterestedOps0()
1017     {
1018         return _deferringInterestedOps0;
1019     }
1020 
1021     /* ------------------------------------------------------------ */
setDeferringInterestedOps0(boolean deferringInterestedOps0)1022     public void setDeferringInterestedOps0(boolean deferringInterestedOps0)
1023     {
1024         _deferringInterestedOps0 = deferringInterestedOps0;
1025     }
1026 
1027 
1028     /* ------------------------------------------------------------ */
1029     /* ------------------------------------------------------------ */
1030     /* ------------------------------------------------------------ */
1031     private interface ChangeTask extends Runnable
1032     {}
1033 
1034 }
1035