/*
* Conditions Of Use
*
* This software was developed by employees of the National Institute of
* Standards and Technology (NIST), an agency of the Federal Government.
* Pursuant to title 15 Untied States Code Section 105, works of NIST
* employees are not subject to copyright protection in the United States
* and are considered to be in the public domain. As a result, a formal
* license is not needed to use the software.
*
* This software is provided by NIST as a service and is expressly
* provided "AS IS." NIST MAKES NO WARRANTY OF ANY KIND, EXPRESS, IMPLIED
* OR STATUTORY, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTY OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT
* AND DATA ACCURACY. NIST does not warrant or make any representations
* regarding the use of the software or the results thereof, including but
* not limited to the correctness, accuracy, reliability or usefulness of
* the software.
*
* Permission to use this software is contingent upon your acceptance
* of the terms of this agreement
*
* .
*
*/
/*******************************************************************************
* Product of NIST/ITL Advanced Networking Technologies Division (ANTD). *
*******************************************************************************/
package gov.nist.javax.sip.stack;
import java.io.IOException;
import java.util.LinkedList;
import java.net.*;
import gov.nist.core.*;
/**
* Sit in a loop and handle incoming udp datagram messages. For each Datagram
* packet, a new UDPMessageChannel is created (upto the max thread pool size).
* Each UDP message is processed in its own thread).
*
* @version 1.2 $Revision: 1.37 $ $Date: 2009/11/14 20:06:16 $
*
* @author M. Ranganathan
*
*
*
*
* See the implementation sequence diagram for processing incoming requests.
*
*
*
* Acknowledgement: Jeff Keyser contributed ideas on starting and stoppping the
* stack that were incorporated into this code. Niklas Uhrberg suggested that
* thread pooling be added to limit the number of threads and improve
* performance.
*/
public class UDPMessageProcessor extends MessageProcessor {
/**
* The Mapped port (in case STUN suport is enabled)
*/
private int port;
/**
* Incoming messages are queued here.
*/
protected LinkedList messageQueue;
/**
* A list of message channels that we have started.
*/
protected LinkedList messageChannels;
/**
* Max # of udp message channels
*/
protected int threadPoolSize;
protected DatagramSocket sock;
/**
* A flag that is set to false to exit the message processor (suggestion by
* Jeff Keyser).
*/
protected boolean isRunning;
private static final int HIGHWAT=5000;
private static final int LOWAT=2500;
/**
* Constructor.
*
* @param sipStack
* pointer to the stack.
*/
protected UDPMessageProcessor(InetAddress ipAddress,
SIPTransactionStack sipStack, int port) throws IOException {
super(ipAddress, port, "udp",sipStack);
this.sipStack = sipStack;
this.messageQueue = new LinkedList();
this.port = port;
try {
this.sock = sipStack.getNetworkLayer().createDatagramSocket(port,
ipAddress);
// Create a new datagram socket.
sock.setReceiveBufferSize(sipStack.getReceiveUdpBufferSize());
sock.setSendBufferSize(sipStack.getSendUdpBufferSize());
/**
* If the thread auditor is enabled, define a socket timeout value in order to
* prevent sock.receive() from blocking forever
*/
if (sipStack.getThreadAuditor().isEnabled()) {
sock.setSoTimeout((int) sipStack.getThreadAuditor().getPingIntervalInMillisecs());
}
if ( ipAddress.getHostAddress().equals(IN_ADDR_ANY) ||
ipAddress.getHostAddress().equals(IN6_ADDR_ANY)){
// Store the address to which we are actually bound
// Note that on WINDOWS this is actually broken. It will
// return IN_ADDR_ANY again. On linux it will return the
// address to which the socket was actually bound.
super.setIpAddress( sock.getLocalAddress() );
}
} catch (SocketException ex) {
throw new IOException(ex.getMessage());
}
}
/**
* Get port on which to listen for incoming stuff.
*
* @return port on which I am listening.
*/
public int getPort() {
return this.port;
}
/**
* Start our processor thread.
*/
public void start() throws IOException {
this.isRunning = true;
Thread thread = new Thread(this);
thread.setDaemon(true);
// Issue #32 on java.net
thread.setName("UDPMessageProcessorThread");
// Issue #184
thread.setPriority(Thread.MAX_PRIORITY);
thread.start();
}
/**
* Thread main routine.
*/
public void run() {
// Check for running flag.
this.messageChannels = new LinkedList();
// start all our messageChannels (unless the thread pool size is
// infinity.
if (sipStack.threadPoolSize != -1) {
for (int i = 0; i < sipStack.threadPoolSize; i++) {
UDPMessageChannel channel = new UDPMessageChannel(sipStack,
this);
this.messageChannels.add(channel);
}
}
// Ask the auditor to monitor this thread
ThreadAuditor.ThreadHandle threadHandle = sipStack.getThreadAuditor().addCurrentThread();
// Somebody asked us to exit. if isRunnning is set to false.
while (this.isRunning) {
try {
// Let the thread auditor know we're up and running
threadHandle.ping();
int bufsize = sock.getReceiveBufferSize();
byte message[] = new byte[bufsize];
DatagramPacket packet = new DatagramPacket(message, bufsize);
sock.receive(packet);
// This is a simplistic congestion control algorithm.
// It accepts packets if queuesize is < LOWAT. It drops
// requests if the queue size exceeds a HIGHWAT and accepts
// requests with probability p proportional to the difference
// between current queue size and LOWAT in the range
// of queue sizes between HIGHWAT and LOWAT.
// TODO -- penalize spammers by looking at the source
// port and IP address.
if ( sipStack.stackDoesCongestionControl ) {
if ( this.messageQueue.size() >= HIGHWAT) {
if (sipStack.isLoggingEnabled()) {
sipStack.getStackLogger().logDebug("Dropping message -- queue length exceeded");
}
//System.out.println("HIGHWAT Drop!");
continue;
} else if ( this.messageQueue.size() > LOWAT && this .messageQueue.size() < HIGHWAT ) {
// Drop the message with a probabilty that is linear in the range 0 to 1
float threshold = ((float)(messageQueue.size() - LOWAT))/ ((float)(HIGHWAT - LOWAT));
boolean decision = Math.random() > 1.0 - threshold;
if ( decision ) {
if (sipStack.isLoggingEnabled()) {
sipStack.getStackLogger().logDebug("Dropping message with probability " + (1.0 - threshold));
}
//System.out.println("RED Drop!");
continue;
}
}
}
// Count of # of packets in process.
// this.useCount++;
if (sipStack.threadPoolSize != -1) {
// Note: the only condition watched for by threads
// synchronizing on the messageQueue member is that it is
// not empty. As soon as you introduce some other
// condition you will have to call notifyAll instead of
// notify below.
synchronized (this.messageQueue) {
// was addLast
this.messageQueue.add(packet);
this.messageQueue.notify();
}
} else {
new UDPMessageChannel(sipStack, this, packet);
}
} catch (SocketTimeoutException ex) {
// This socket timeout alows us to ping the thread auditor periodically
} catch (SocketException ex) {
if (sipStack.isLoggingEnabled())
getSIPStack().getStackLogger()
.logDebug("UDPMessageProcessor: Stopping");
isRunning = false;
// The notifyAll should be in a synchronized block.
// ( bug report by Niklas Uhrberg ).
synchronized (this.messageQueue) {
this.messageQueue.notifyAll();
}
} catch (IOException ex) {
isRunning = false;
ex.printStackTrace();
if (sipStack.isLoggingEnabled())
getSIPStack().getStackLogger()
.logDebug("UDPMessageProcessor: Got an IO Exception");
} catch (Exception ex) {
if (sipStack.isLoggingEnabled())
getSIPStack().getStackLogger()
.logDebug("UDPMessageProcessor: Unexpected Exception - quitting");
InternalErrorHandler.handleException(ex);
return;
}
}
}
/**
* Shut down the message processor. Close the socket for recieving incoming
* messages.
*/
public void stop() {
synchronized (this.messageQueue) {
this.isRunning = false;
this.messageQueue.notifyAll();
sock.close();
}
}
/**
* Return the transport string.
*
* @return the transport string
*/
public String getTransport() {
return "udp";
}
/**
* Returns the stack.
*
* @return my sip stack.
*/
public SIPTransactionStack getSIPStack() {
return sipStack;
}
/**
* Create and return new TCPMessageChannel for the given host/port.
*/
public MessageChannel createMessageChannel(HostPort targetHostPort)
throws UnknownHostException {
return new UDPMessageChannel(targetHostPort.getInetAddress(),
targetHostPort.getPort(), sipStack, this);
}
public MessageChannel createMessageChannel(InetAddress host, int port)
throws IOException {
return new UDPMessageChannel(host, port, sipStack, this);
}
/**
* Default target port for UDP
*/
public int getDefaultTargetPort() {
return 5060;
}
/**
* UDP is not a secure protocol.
*/
public boolean isSecure() {
return false;
}
/**
* UDP can handle a message as large as the MAX_DATAGRAM_SIZE.
*/
public int getMaximumMessageSize() {
return 8*1024;
}
/**
* Return true if there are any messages in use.
*/
public boolean inUse() {
synchronized (messageQueue) {
return messageQueue.size() != 0;
}
}
}