1 //
2 //  ========================================================================
3 //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4 //  ------------------------------------------------------------------------
5 //  All rights reserved. This program and the accompanying materials
6 //  are made available under the terms of the Eclipse Public License v1.0
7 //  and Apache License v2.0 which accompanies this distribution.
8 //
9 //      The Eclipse Public License is available at
10 //      http://www.eclipse.org/legal/epl-v10.html
11 //
12 //      The Apache License v2.0 is available at
13 //      http://www.opensource.org/licenses/apache2.0.php
14 //
15 //  You may elect to redistribute this code under either of these licenses.
16 //  ========================================================================
17 //
18 
19 package org.eclipse.jetty.io.nio;
20 
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.nio.channels.ClosedChannelException;
24 import java.nio.channels.SelectableChannel;
25 import java.nio.channels.SelectionKey;
26 import java.nio.channels.SocketChannel;
27 import java.util.Locale;
28 
29 import org.eclipse.jetty.io.AsyncEndPoint;
30 import org.eclipse.jetty.io.Buffer;
31 import org.eclipse.jetty.io.ConnectedEndPoint;
32 import org.eclipse.jetty.io.Connection;
33 import org.eclipse.jetty.io.EofException;
34 import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
35 import org.eclipse.jetty.util.log.Log;
36 import org.eclipse.jetty.util.log.Logger;
37 import org.eclipse.jetty.util.thread.Timeout.Task;
38 
39 /* ------------------------------------------------------------ */
40 /**
41  * An Endpoint that can be scheduled by {@link SelectorManager}.
42  */
43 public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPoint, ConnectedEndPoint
44 {
45     public static final Logger LOG=Log.getLogger("org.eclipse.jetty.io.nio");
46 
47     private final boolean WORK_AROUND_JVM_BUG_6346658 = System.getProperty("os.name").toLowerCase(Locale.ENGLISH).contains("win");
48     private final SelectorManager.SelectSet _selectSet;
49     private final SelectorManager _manager;
50     private  SelectionKey _key;
51     private final Runnable _handler = new Runnable()
52         {
53             public void run() { handle(); }
54         };
55 
56     /** The desired value for {@link SelectionKey#interestOps()} */
57     private int _interestOps;
58 
59     /**
60      * The connection instance is the handler for any IO activity on the endpoint.
61      * There is a different type of connection for HTTP, AJP, WebSocket and
62      * ProxyConnect.   The connection may change for an SCEP as it is upgraded
63      * from HTTP to proxy connect or websocket.
64      */
65     private volatile AsyncConnection _connection;
66 
67     private static final int STATE_NEEDS_DISPATCH=-1;
68     private static final int STATE_UNDISPATCHED=0;
69     private static final int STATE_DISPATCHED=1;
70     private static final int STATE_ASYNC=2;
71     private int _state;
72 
73     private boolean _onIdle;
74 
75     /** true if the last write operation succeed and wrote all offered bytes */
76     private volatile boolean _writable = true;
77 
78 
79     /** True if a thread has is blocked in {@link #blockReadable(long)} */
80     private boolean _readBlocked;
81 
82     /** True if a thread has is blocked in {@link #blockWritable(long)} */
83     private boolean _writeBlocked;
84 
85     /** true if {@link SelectSet#destroyEndPoint(SelectChannelEndPoint)} has not been called */
86     private boolean _open;
87 
88     private volatile long _idleTimestamp;
89     private volatile boolean _checkIdle;
90 
91     private boolean _interruptable;
92 
93     private boolean _ishut;
94 
95     /* ------------------------------------------------------------ */
SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime)96     public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime)
97         throws IOException
98     {
99         super(channel, maxIdleTime);
100 
101         _manager = selectSet.getManager();
102         _selectSet = selectSet;
103         _state=STATE_UNDISPATCHED;
104         _onIdle=false;
105         _open=true;
106         _key = key;
107 
108         setCheckForIdle(true);
109     }
110 
111     /* ------------------------------------------------------------ */
getSelectionKey()112     public SelectionKey getSelectionKey()
113     {
114         synchronized (this)
115         {
116             return _key;
117         }
118     }
119 
120     /* ------------------------------------------------------------ */
getSelectManager()121     public SelectorManager getSelectManager()
122     {
123         return _manager;
124     }
125 
126     /* ------------------------------------------------------------ */
getConnection()127     public Connection getConnection()
128     {
129         return _connection;
130     }
131 
132     /* ------------------------------------------------------------ */
setConnection(Connection connection)133     public void setConnection(Connection connection)
134     {
135         Connection old=_connection;
136         _connection=(AsyncConnection)connection;
137         if (old!=null && old!=_connection)
138             _manager.endPointUpgraded(this,old);
139     }
140 
141     /* ------------------------------------------------------------ */
getIdleTimestamp()142     public long getIdleTimestamp()
143     {
144         return _idleTimestamp;
145     }
146 
147     /* ------------------------------------------------------------ */
148     /** Called by selectSet to schedule handling
149      *
150      */
schedule()151     public void schedule()
152     {
153         synchronized (this)
154         {
155             // If there is no key, then do nothing
156             if (_key == null || !_key.isValid())
157             {
158                 _readBlocked=false;
159                 _writeBlocked=false;
160                 this.notifyAll();
161                 return;
162             }
163 
164             // If there are threads dispatched reading and writing
165             if (_readBlocked || _writeBlocked)
166             {
167                 // assert _dispatched;
168                 if (_readBlocked && _key.isReadable())
169                     _readBlocked=false;
170                 if (_writeBlocked && _key.isWritable())
171                     _writeBlocked=false;
172 
173                 // wake them up is as good as a dispatched.
174                 this.notifyAll();
175 
176                 // we are not interested in further selecting
177                 _key.interestOps(0);
178                 if (_state<STATE_DISPATCHED)
179                     updateKey();
180                 return;
181             }
182 
183             // Remove writeable op
184             if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE)
185             {
186                 // Remove writeable op
187                 _interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE;
188                 _key.interestOps(_interestOps);
189                 _writable = true; // Once writable is in ops, only removed with dispatch.
190             }
191 
192             // If dispatched, then deregister interest
193             if (_state>=STATE_DISPATCHED)
194                 _key.interestOps(0);
195             else
196             {
197                 // other wise do the dispatch
198                 dispatch();
199                 if (_state>=STATE_DISPATCHED && !_selectSet.getManager().isDeferringInterestedOps0())
200                 {
201                     _key.interestOps(0);
202                 }
203             }
204         }
205     }
206 
207     /* ------------------------------------------------------------ */
asyncDispatch()208     public void asyncDispatch()
209     {
210         synchronized(this)
211         {
212             switch(_state)
213             {
214                 case STATE_NEEDS_DISPATCH:
215                 case STATE_UNDISPATCHED:
216                     dispatch();
217                     break;
218 
219                 case STATE_DISPATCHED:
220                 case STATE_ASYNC:
221                     _state=STATE_ASYNC;
222                     break;
223             }
224         }
225     }
226 
227     /* ------------------------------------------------------------ */
dispatch()228     public void dispatch()
229     {
230         synchronized(this)
231         {
232             if (_state<=STATE_UNDISPATCHED)
233             {
234                 if (_onIdle)
235                     _state = STATE_NEEDS_DISPATCH;
236                 else
237                 {
238                     _state = STATE_DISPATCHED;
239                     boolean dispatched = _manager.dispatch(_handler);
240                     if(!dispatched)
241                     {
242                         _state = STATE_NEEDS_DISPATCH;
243                         LOG.warn("Dispatched Failed! "+this+" to "+_manager);
244                         updateKey();
245                     }
246                 }
247             }
248         }
249     }
250 
251     /* ------------------------------------------------------------ */
252     /**
253      * Called when a dispatched thread is no longer handling the endpoint.
254      * The selection key operations are updated.
255      * @return If false is returned, the endpoint has been redispatched and
256      * thread must keep handling the endpoint.
257      */
undispatch()258     protected boolean undispatch()
259     {
260         synchronized (this)
261         {
262             switch(_state)
263             {
264                 case STATE_ASYNC:
265                     _state=STATE_DISPATCHED;
266                     return false;
267 
268                 default:
269                     _state=STATE_UNDISPATCHED;
270                     updateKey();
271                     return true;
272             }
273         }
274     }
275 
276     /* ------------------------------------------------------------ */
cancelTimeout(Task task)277     public void cancelTimeout(Task task)
278     {
279         getSelectSet().cancelTimeout(task);
280     }
281 
282     /* ------------------------------------------------------------ */
scheduleTimeout(Task task, long timeoutMs)283     public void scheduleTimeout(Task task, long timeoutMs)
284     {
285         getSelectSet().scheduleTimeout(task,timeoutMs);
286     }
287 
288     /* ------------------------------------------------------------ */
setCheckForIdle(boolean check)289     public void setCheckForIdle(boolean check)
290     {
291         if (check)
292         {
293             _idleTimestamp=System.currentTimeMillis();
294             _checkIdle=true;
295         }
296         else
297             _checkIdle=false;
298     }
299 
300     /* ------------------------------------------------------------ */
isCheckForIdle()301     public boolean isCheckForIdle()
302     {
303         return _checkIdle;
304     }
305 
306     /* ------------------------------------------------------------ */
notIdle()307     protected void notIdle()
308     {
309         _idleTimestamp=System.currentTimeMillis();
310     }
311 
312     /* ------------------------------------------------------------ */
checkIdleTimestamp(long now)313     public void checkIdleTimestamp(long now)
314     {
315         if (isCheckForIdle() && _maxIdleTime>0)
316         {
317             final long idleForMs=now-_idleTimestamp;
318 
319             if (idleForMs>_maxIdleTime)
320             {
321                 // Don't idle out again until onIdleExpired task completes.
322                 setCheckForIdle(false);
323                 _manager.dispatch(new Runnable()
324                 {
325                     public void run()
326                     {
327                         try
328                         {
329                             onIdleExpired(idleForMs);
330                         }
331                         finally
332                         {
333                             setCheckForIdle(true);
334                         }
335                     }
336                 });
337             }
338         }
339     }
340 
341     /* ------------------------------------------------------------ */
onIdleExpired(long idleForMs)342     public void onIdleExpired(long idleForMs)
343     {
344         try
345         {
346             synchronized (this)
347             {
348                 _onIdle=true;
349             }
350 
351             _connection.onIdleExpired(idleForMs);
352         }
353         finally
354         {
355             synchronized (this)
356             {
357                 _onIdle=false;
358                 if (_state==STATE_NEEDS_DISPATCH)
359                     dispatch();
360             }
361         }
362     }
363 
364     /* ------------------------------------------------------------ */
365     @Override
fill(Buffer buffer)366     public int fill(Buffer buffer) throws IOException
367     {
368         int fill=super.fill(buffer);
369         if (fill>0)
370             notIdle();
371         return fill;
372     }
373 
374     /* ------------------------------------------------------------ */
375     @Override
flush(Buffer header, Buffer buffer, Buffer trailer)376     public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
377     {
378         int l = super.flush(header, buffer, trailer);
379 
380         // If there was something to write and it wasn't written, then we are not writable.
381         if (l==0 && ( header!=null && header.hasContent() || buffer!=null && buffer.hasContent() || trailer!=null && trailer.hasContent()))
382         {
383             synchronized (this)
384             {
385                 _writable=false;
386                 if (_state<STATE_DISPATCHED)
387                     updateKey();
388             }
389         }
390         else if (l>0)
391         {
392             _writable=true;
393             notIdle();
394         }
395         return l;
396     }
397 
398     /* ------------------------------------------------------------ */
399     /*
400      */
401     @Override
flush(Buffer buffer)402     public int flush(Buffer buffer) throws IOException
403     {
404         int l = super.flush(buffer);
405 
406         // If there was something to write and it wasn't written, then we are not writable.
407         if (l==0 && buffer!=null && buffer.hasContent())
408         {
409             synchronized (this)
410             {
411                 _writable=false;
412                 if (_state<STATE_DISPATCHED)
413                     updateKey();
414             }
415         }
416         else if (l>0)
417         {
418             _writable=true;
419             notIdle();
420         }
421 
422         return l;
423     }
424 
425     /* ------------------------------------------------------------ */
426     /*
427      * Allows thread to block waiting for further events.
428      */
429     @Override
blockReadable(long timeoutMs)430     public boolean blockReadable(long timeoutMs) throws IOException
431     {
432         synchronized (this)
433         {
434             if (isInputShutdown())
435                 throw new EofException();
436 
437             long now=_selectSet.getNow();
438             long end=now+timeoutMs;
439             boolean check=isCheckForIdle();
440             setCheckForIdle(true);
441             try
442             {
443                 _readBlocked=true;
444                 while (!isInputShutdown() && _readBlocked)
445                 {
446                     try
447                     {
448                         updateKey();
449                         this.wait(timeoutMs>0?(end-now):10000);
450                     }
451                     catch (final InterruptedException e)
452                     {
453                         LOG.warn(e);
454                         if (_interruptable)
455                             throw new InterruptedIOException(){{this.initCause(e);}};
456                     }
457                     finally
458                     {
459                         now=_selectSet.getNow();
460                     }
461 
462                     if (_readBlocked && timeoutMs>0 && now>=end)
463                         return false;
464                 }
465             }
466             finally
467             {
468                 _readBlocked=false;
469                 setCheckForIdle(check);
470             }
471         }
472         return true;
473     }
474 
475     /* ------------------------------------------------------------ */
476     /*
477      * Allows thread to block waiting for further events.
478      */
479     @Override
blockWritable(long timeoutMs)480     public boolean blockWritable(long timeoutMs) throws IOException
481     {
482         synchronized (this)
483         {
484             if (isOutputShutdown())
485                 throw new EofException();
486 
487             long now=_selectSet.getNow();
488             long end=now+timeoutMs;
489             boolean check=isCheckForIdle();
490             setCheckForIdle(true);
491             try
492             {
493                 _writeBlocked=true;
494                 while (_writeBlocked && !isOutputShutdown())
495                 {
496                     try
497                     {
498                         updateKey();
499                         this.wait(timeoutMs>0?(end-now):10000);
500                     }
501                     catch (final InterruptedException e)
502                     {
503                         LOG.warn(e);
504                         if (_interruptable)
505                             throw new InterruptedIOException(){{this.initCause(e);}};
506                     }
507                     finally
508                     {
509                         now=_selectSet.getNow();
510                     }
511                     if (_writeBlocked && timeoutMs>0 && now>=end)
512                         return false;
513                 }
514             }
515             finally
516             {
517                 _writeBlocked=false;
518                 setCheckForIdle(check);
519             }
520         }
521         return true;
522     }
523 
524     /* ------------------------------------------------------------ */
525     /** Set the interruptable mode of the endpoint.
526      * If set to false (default), then interrupts are assumed to be spurious
527      * and blocking operations continue unless the endpoint has been closed.
528      * If true, then interrupts of blocking operations result in InterruptedIOExceptions
529      * being thrown.
530      * @param interupable
531      */
setInterruptable(boolean interupable)532     public void setInterruptable(boolean interupable)
533     {
534         synchronized (this)
535         {
536             _interruptable=interupable;
537         }
538     }
539 
540     /* ------------------------------------------------------------ */
isInterruptable()541     public boolean isInterruptable()
542     {
543         return _interruptable;
544     }
545 
546     /* ------------------------------------------------------------ */
547     /**
548      * @see org.eclipse.jetty.io.AsyncEndPoint#scheduleWrite()
549      */
scheduleWrite()550     public void scheduleWrite()
551     {
552         if (_writable)
553             LOG.debug("Required scheduleWrite {}",this);
554 
555         _writable=false;
556         updateKey();
557     }
558 
559     /* ------------------------------------------------------------ */
isWritable()560     public boolean isWritable()
561     {
562         return _writable;
563     }
564 
565     /* ------------------------------------------------------------ */
hasProgressed()566     public boolean hasProgressed()
567     {
568         return false;
569     }
570 
571     /* ------------------------------------------------------------ */
572     /**
573      * Updates selection key. Adds operations types to the selection key as needed. No operations
574      * are removed as this is only done during dispatch. This method records the new key and
575      * schedules a call to doUpdateKey to do the keyChange
576      */
updateKey()577     private void updateKey()
578     {
579         final boolean changed;
580         synchronized (this)
581         {
582             int current_ops=-1;
583             if (getChannel().isOpen())
584             {
585                 boolean read_interest = _readBlocked || (_state<STATE_DISPATCHED && !_connection.isSuspended());
586                 boolean write_interest= _writeBlocked || (_state<STATE_DISPATCHED && !_writable);
587 
588                 _interestOps =
589                     ((!_socket.isInputShutdown() && read_interest ) ? SelectionKey.OP_READ  : 0)
590                 |   ((!_socket.isOutputShutdown()&& write_interest) ? SelectionKey.OP_WRITE : 0);
591                 try
592                 {
593                     current_ops = ((_key!=null && _key.isValid())?_key.interestOps():-1);
594                 }
595                 catch(Exception e)
596                 {
597                     _key=null;
598                     LOG.ignore(e);
599                 }
600             }
601             changed=_interestOps!=current_ops;
602         }
603 
604         if(changed)
605         {
606             _selectSet.addChange(this);
607             _selectSet.wakeup();
608         }
609     }
610 
611 
612     /* ------------------------------------------------------------ */
613     /**
614      * Synchronize the interestOps with the actual key. Call is scheduled by a call to updateKey
615      */
doUpdateKey()616     void doUpdateKey()
617     {
618         synchronized (this)
619         {
620             if (getChannel().isOpen())
621             {
622                 if (_interestOps>0)
623                 {
624                     if (_key==null || !_key.isValid())
625                     {
626                         SelectableChannel sc = (SelectableChannel)getChannel();
627                         if (sc.isRegistered())
628                         {
629                             updateKey();
630                         }
631                         else
632                         {
633                             try
634                             {
635                                 _key=((SelectableChannel)getChannel()).register(_selectSet.getSelector(),_interestOps,this);
636                             }
637                             catch (Exception e)
638                             {
639                                 LOG.ignore(e);
640                                 if (_key!=null && _key.isValid())
641                                 {
642                                     _key.cancel();
643                                 }
644 
645                                 if (_open)
646                                 {
647                                     _selectSet.destroyEndPoint(this);
648                                 }
649                                 _open=false;
650                                 _key = null;
651                             }
652                         }
653                     }
654                     else
655                     {
656                         _key.interestOps(_interestOps);
657                     }
658                 }
659                 else
660                 {
661                     if (_key!=null && _key.isValid())
662                         _key.interestOps(0);
663                     else
664                         _key=null;
665                 }
666             }
667             else
668             {
669                 if (_key!=null && _key.isValid())
670                     _key.cancel();
671 
672                 if (_open)
673                 {
674                     _open=false;
675                     _selectSet.destroyEndPoint(this);
676                 }
677                 _key = null;
678             }
679         }
680     }
681 
682     /* ------------------------------------------------------------ */
683     /*
684      */
handle()685     protected void handle()
686     {
687         boolean dispatched=true;
688         try
689         {
690             while(dispatched)
691             {
692                 try
693                 {
694                     while(true)
695                     {
696                         final AsyncConnection next = (AsyncConnection)_connection.handle();
697                         if (next!=_connection)
698                         {
699                             LOG.debug("{} replaced {}",next,_connection);
700                             Connection old=_connection;
701                             _connection=next;
702                             _manager.endPointUpgraded(this,old);
703                             continue;
704                         }
705                         break;
706                     }
707                 }
708                 catch (ClosedChannelException e)
709                 {
710                     LOG.ignore(e);
711                 }
712                 catch (EofException e)
713                 {
714                     LOG.debug("EOF", e);
715                     try{close();}
716                     catch(IOException e2){LOG.ignore(e2);}
717                 }
718                 catch (IOException e)
719                 {
720                     LOG.warn(e.toString());
721                     try{close();}
722                     catch(IOException e2){LOG.ignore(e2);}
723                 }
724                 catch (Throwable e)
725                 {
726                     LOG.warn("handle failed", e);
727                     try{close();}
728                     catch(IOException e2){LOG.ignore(e2);}
729                 }
730                 finally
731                 {
732                     if (!_ishut && isInputShutdown() && isOpen())
733                     {
734                         _ishut=true;
735                         try
736                         {
737                             _connection.onInputShutdown();
738                         }
739                         catch(Throwable x)
740                         {
741                             LOG.warn("onInputShutdown failed", x);
742                             try{close();}
743                             catch(IOException e2){LOG.ignore(e2);}
744                         }
745                         finally
746                         {
747                             updateKey();
748                         }
749                     }
750                     dispatched=!undispatch();
751                 }
752             }
753         }
754         finally
755         {
756             if (dispatched)
757             {
758                 dispatched=!undispatch();
759                 while (dispatched)
760                 {
761                     LOG.warn("SCEP.run() finally DISPATCHED");
762                     dispatched=!undispatch();
763                 }
764             }
765         }
766     }
767 
768     /* ------------------------------------------------------------ */
769     /*
770      * @see org.eclipse.io.nio.ChannelEndPoint#close()
771      */
772     @Override
close()773     public void close() throws IOException
774     {
775         // On unix systems there is a JVM issue that if you cancel before closing, it can
776         // cause the selector to block waiting for a channel to close and that channel can
777         // block waiting for the remote end.  But on windows, if you don't cancel before a
778         // close, then the selector can block anyway!
779         // https://bugs.eclipse.org/bugs/show_bug.cgi?id=357318
780         if (WORK_AROUND_JVM_BUG_6346658)
781         {
782             try
783             {
784                 SelectionKey key = _key;
785                 if (key!=null)
786                     key.cancel();
787             }
788             catch (Throwable e)
789             {
790                 LOG.ignore(e);
791             }
792         }
793 
794         try
795         {
796             super.close();
797         }
798         catch (IOException e)
799         {
800             LOG.ignore(e);
801         }
802         finally
803         {
804             updateKey();
805         }
806     }
807 
808     /* ------------------------------------------------------------ */
809     @Override
toString()810     public String toString()
811     {
812         // Do NOT use synchronized (this)
813         // because it's very easy to deadlock when debugging is enabled.
814         // We do a best effort to print the right toString() and that's it.
815         SelectionKey key = _key;
816         String keyString = "";
817         if (key != null)
818         {
819             if (key.isValid())
820             {
821                 if (key.isReadable())
822                     keyString += "r";
823                 if (key.isWritable())
824                     keyString += "w";
825             }
826             else
827             {
828                 keyString += "!";
829             }
830         }
831         else
832         {
833             keyString += "-";
834         }
835         return String.format("SCEP@%x{l(%s)<->r(%s),s=%d,open=%b,ishut=%b,oshut=%b,rb=%b,wb=%b,w=%b,i=%d%s}-{%s}",
836                 hashCode(),
837                 _socket.getRemoteSocketAddress(),
838                 _socket.getLocalSocketAddress(),
839                 _state,
840                 isOpen(),
841                 isInputShutdown(),
842                 isOutputShutdown(),
843                 _readBlocked,
844                 _writeBlocked,
845                 _writable,
846                 _interestOps,
847                 keyString,
848                 _connection);
849     }
850 
851     /* ------------------------------------------------------------ */
getSelectSet()852     public SelectSet getSelectSet()
853     {
854         return _selectSet;
855     }
856 
857     /* ------------------------------------------------------------ */
858     /**
859      * Don't set the SoTimeout
860      * @see org.eclipse.jetty.io.nio.ChannelEndPoint#setMaxIdleTime(int)
861      */
862     @Override
setMaxIdleTime(int timeMs)863     public void setMaxIdleTime(int timeMs) throws IOException
864     {
865         _maxIdleTime=timeMs;
866     }
867 
868 }
869