1 /*
2 * Conditions Of Use
3 *
4 * This software was developed by employees of the National Institute of
5 * Standards and Technology (NIST), an agency of the Federal Government.
6 * Pursuant to title 15 Untied States Code Section 105, works of NIST
7 * employees are not subject to copyright protection in the United States
8 * and are considered to be in the public domain.  As a result, a formal
9 * license is not needed to use the software.
10 *
11 * This software is provided by NIST as a service and is expressly
12 * provided "AS IS."  NIST MAKES NO WARRANTY OF ANY KIND, EXPRESS, IMPLIED
13 * OR STATUTORY, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTY OF
14 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT
15 * AND DATA ACCURACY.  NIST does not warrant or make any representations
16 * regarding the use of the software or the results thereof, including but
17 * not limited to the correctness, accuracy, reliability or usefulness of
18 * the software.
19 *
20 * Permission to use this software is contingent upon your acceptance
21 * of the terms of this agreement
22 *
23 * .
24 *
25 */
26 /*******************************************************************************
27  *   Product of NIST/ITL Advanced Networking Technologies Division (ANTD).     *
28  *******************************************************************************/
29 package gov.nist.javax.sip.stack;
30 
31 import java.io.IOException;
32 import java.util.LinkedList;
33 import java.net.*;
34 
35 import gov.nist.core.*;
36 
37 /**
38  * Sit in a loop and handle incoming udp datagram messages. For each Datagram
39  * packet, a new UDPMessageChannel is created (upto the max thread pool size).
40  * Each UDP message is processed in its own thread).
41  *
42  * @version 1.2 $Revision: 1.37 $ $Date: 2009/11/14 20:06:16 $
43  *
44  * @author M. Ranganathan  <br/>
45  *
46  *
47  *
48  * <a href="{@docRoot}/../uml/udp-request-processing-sequence-diagram.jpg">
49  * See the implementation sequence diagram for processing incoming requests.
50  * </a>
51  *
52  *
53  * Acknowledgement: Jeff Keyser contributed ideas on starting and stoppping the
54  * stack that were incorporated into this code. Niklas Uhrberg suggested that
55  * thread pooling be added to limit the number of threads and improve
56  * performance.
57  */
58 public class UDPMessageProcessor extends MessageProcessor {
59     /**
60      * The Mapped port (in case STUN suport is enabled)
61      */
62     private int port;
63 
64     /**
65      * Incoming messages are queued here.
66      */
67     protected LinkedList messageQueue;
68 
69     /**
70      * A list of message channels that we have started.
71      */
72     protected LinkedList messageChannels;
73 
74     /**
75      * Max # of udp message channels
76      */
77     protected int threadPoolSize;
78 
79     protected DatagramSocket sock;
80 
81     /**
82      * A flag that is set to false to exit the message processor (suggestion by
83      * Jeff Keyser).
84      */
85     protected boolean isRunning;
86 
87     private static final int HIGHWAT=5000;
88 
89     private static final int LOWAT=2500;
90 
91     /**
92      * Constructor.
93      *
94      * @param sipStack
95      *            pointer to the stack.
96      */
UDPMessageProcessor(InetAddress ipAddress, SIPTransactionStack sipStack, int port)97     protected UDPMessageProcessor(InetAddress ipAddress,
98             SIPTransactionStack sipStack, int port) throws IOException {
99         super(ipAddress, port, "udp",sipStack);
100 
101         this.sipStack = sipStack;
102 
103         this.messageQueue = new LinkedList();
104 
105         this.port = port;
106         try {
107             this.sock = sipStack.getNetworkLayer().createDatagramSocket(port,
108                     ipAddress);
109             // Create a new datagram socket.
110             sock.setReceiveBufferSize(sipStack.getReceiveUdpBufferSize());
111             sock.setSendBufferSize(sipStack.getSendUdpBufferSize());
112 
113             /**
114              * If the thread auditor is enabled, define a socket timeout value in order to
115              * prevent sock.receive() from blocking forever
116              */
117             if (sipStack.getThreadAuditor().isEnabled()) {
118                 sock.setSoTimeout((int) sipStack.getThreadAuditor().getPingIntervalInMillisecs());
119             }
120             if ( ipAddress.getHostAddress().equals(IN_ADDR_ANY)  ||
121                  ipAddress.getHostAddress().equals(IN6_ADDR_ANY)){
122                 // Store the address to which we are actually bound
123                 // Note that on WINDOWS this is actually broken. It will
124                 // return IN_ADDR_ANY again. On linux it will return the
125                 // address to which the socket was actually bound.
126                 super.setIpAddress( sock.getLocalAddress() );
127 
128             }
129         } catch (SocketException ex) {
130             throw new IOException(ex.getMessage());
131         }
132     }
133 
134 
135 
136     /**
137      * Get port on which to listen for incoming stuff.
138      *
139      * @return port on which I am listening.
140      */
getPort()141     public int getPort() {
142         return this.port;
143     }
144 
145     /**
146      * Start our processor thread.
147      */
start()148     public void start() throws IOException {
149 
150 
151         this.isRunning = true;
152         Thread thread = new Thread(this);
153         thread.setDaemon(true);
154         // Issue #32 on java.net
155         thread.setName("UDPMessageProcessorThread");
156         // Issue #184
157         thread.setPriority(Thread.MAX_PRIORITY);
158         thread.start();
159     }
160 
161     /**
162      * Thread main routine.
163      */
run()164     public void run() {
165         // Check for running flag.
166         this.messageChannels = new LinkedList();
167         // start all our messageChannels (unless the thread pool size is
168         // infinity.
169         if (sipStack.threadPoolSize != -1) {
170             for (int i = 0; i < sipStack.threadPoolSize; i++) {
171                 UDPMessageChannel channel = new UDPMessageChannel(sipStack,
172                         this);
173                 this.messageChannels.add(channel);
174 
175             }
176         }
177 
178         // Ask the auditor to monitor this thread
179         ThreadAuditor.ThreadHandle threadHandle = sipStack.getThreadAuditor().addCurrentThread();
180 
181         // Somebody asked us to exit. if isRunnning is set to false.
182         while (this.isRunning) {
183 
184             try {
185                 // Let the thread auditor know we're up and running
186                 threadHandle.ping();
187 
188                 int bufsize = sock.getReceiveBufferSize();
189                 byte message[] = new byte[bufsize];
190                 DatagramPacket packet = new DatagramPacket(message, bufsize);
191                 sock.receive(packet);
192 
193 
194 
195              // This is a simplistic congestion control algorithm.
196              // It accepts packets if queuesize is < LOWAT. It drops
197              // requests if the queue size exceeds a HIGHWAT and accepts
198              // requests with probability p proportional to the difference
199              // between current queue size and LOWAT in the range
200              // of queue sizes between HIGHWAT and LOWAT.
201              // TODO -- penalize spammers by looking at the source
202              // port and IP address.
203              if ( sipStack.stackDoesCongestionControl ) {
204              if ( this.messageQueue.size() >= HIGHWAT) {
205                     if (sipStack.isLoggingEnabled()) {
206                         sipStack.getStackLogger().logDebug("Dropping message -- queue length exceeded");
207 
208                     }
209                     //System.out.println("HIGHWAT Drop!");
210                     continue;
211                 } else if ( this.messageQueue.size() > LOWAT && this .messageQueue.size() < HIGHWAT ) {
212                     // Drop the message with a probabilty that is linear in the range 0 to 1
213                     float threshold = ((float)(messageQueue.size() - LOWAT))/ ((float)(HIGHWAT - LOWAT));
214                     boolean decision = Math.random() > 1.0 - threshold;
215                     if ( decision ) {
216                         if (sipStack.isLoggingEnabled()) {
217                             sipStack.getStackLogger().logDebug("Dropping message with probability  " + (1.0 - threshold));
218 
219                         }
220                         //System.out.println("RED Drop!");
221                         continue;
222                     }
223 
224                 }
225              }
226 
227 
228 
229                 // Count of # of packets in process.
230                 // this.useCount++;
231                 if (sipStack.threadPoolSize != -1) {
232                     // Note: the only condition watched for by threads
233                     // synchronizing on the messageQueue member is that it is
234                     // not empty. As soon as you introduce some other
235                     // condition you will have to call notifyAll instead of
236                     // notify below.
237 
238                     synchronized (this.messageQueue) {
239                         // was addLast
240                         this.messageQueue.add(packet);
241                         this.messageQueue.notify();
242                     }
243                 } else {
244                     new UDPMessageChannel(sipStack, this, packet);
245                 }
246             } catch (SocketTimeoutException ex) {
247               // This socket timeout alows us to ping the thread auditor periodically
248             } catch (SocketException ex) {
249                 if (sipStack.isLoggingEnabled())
250                     getSIPStack().getStackLogger()
251                             .logDebug("UDPMessageProcessor: Stopping");
252                 isRunning = false;
253                 // The notifyAll should be in a synchronized block.
254                 // ( bug report by Niklas Uhrberg ).
255                 synchronized (this.messageQueue) {
256                     this.messageQueue.notifyAll();
257                 }
258             } catch (IOException ex) {
259                 isRunning = false;
260                 ex.printStackTrace();
261                 if (sipStack.isLoggingEnabled())
262                     getSIPStack().getStackLogger()
263                             .logDebug("UDPMessageProcessor: Got an IO Exception");
264             } catch (Exception ex) {
265                 if (sipStack.isLoggingEnabled())
266                     getSIPStack().getStackLogger()
267                             .logDebug("UDPMessageProcessor: Unexpected Exception - quitting");
268                 InternalErrorHandler.handleException(ex);
269                 return;
270             }
271         }
272     }
273 
274     /**
275      * Shut down the message processor. Close the socket for recieving incoming
276      * messages.
277      */
stop()278     public void stop() {
279         synchronized (this.messageQueue) {
280             this.isRunning = false;
281             this.messageQueue.notifyAll();
282             sock.close();
283 
284 
285         }
286     }
287 
288     /**
289      * Return the transport string.
290      *
291      * @return the transport string
292      */
getTransport()293     public String getTransport() {
294         return "udp";
295     }
296 
297     /**
298      * Returns the stack.
299      *
300      * @return my sip stack.
301      */
getSIPStack()302     public SIPTransactionStack getSIPStack() {
303         return sipStack;
304     }
305 
306     /**
307      * Create and return new TCPMessageChannel for the given host/port.
308      */
createMessageChannel(HostPort targetHostPort)309     public MessageChannel createMessageChannel(HostPort targetHostPort)
310             throws UnknownHostException {
311         return new UDPMessageChannel(targetHostPort.getInetAddress(),
312                 targetHostPort.getPort(), sipStack, this);
313     }
314 
createMessageChannel(InetAddress host, int port)315     public MessageChannel createMessageChannel(InetAddress host, int port)
316             throws IOException {
317         return new UDPMessageChannel(host, port, sipStack, this);
318     }
319 
320     /**
321      * Default target port for UDP
322      */
getDefaultTargetPort()323     public int getDefaultTargetPort() {
324         return 5060;
325     }
326 
327     /**
328      * UDP is not a secure protocol.
329      */
isSecure()330     public boolean isSecure() {
331         return false;
332     }
333 
334     /**
335      * UDP can handle a message as large as the MAX_DATAGRAM_SIZE.
336      */
getMaximumMessageSize()337     public int getMaximumMessageSize() {
338         return 8*1024;
339     }
340 
341     /**
342      * Return true if there are any messages in use.
343      */
inUse()344     public boolean inUse() {
345         synchronized (messageQueue) {
346             return messageQueue.size() != 0;
347         }
348     }
349 
350 }
351