Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java?view=diff&rev=563982&r1=563981&r2=563982 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java Wed Aug 8 11:56:59 2007 @@ -31,21 +31,22 @@ import org.apache.activemq.wireformat.WireFormat; /** - * Implements marshalling and unmarsalling the Stomp protocol. + * Implements marshalling and unmarsalling the Stomp protocol. */ public class StompWireFormat implements WireFormat { - private static final byte[] NO_DATA = new byte[]{}; - private static final byte[] END_OF_FRAME = new byte[]{0,'\n'}; - - private static final int MAX_COMMAND_LENGTH = 1024; - private static final int MAX_HEADER_LENGTH = 1024*10; - private static final int MAX_HEADERS = 1000; - private static final int MAX_DATA_LENGTH = 1024*1024*100; - - private int version=1; + private static final byte[] NO_DATA = new byte[] {}; + private static final byte[] END_OF_FRAME = new byte[] {0, '\n'}; - public ByteSequence marshal(Object command) throws IOException { + private static final int MAX_COMMAND_LENGTH = 1024; + private static final int MAX_HEADER_LENGTH = 1024 * 10; + private static final int MAX_HEADERS = 1000; + private static final int MAX_DATA_LENGTH = 1024 * 1024 * 100; + + private int version = 1; + + public ByteSequence marshal(Object command) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(baos); marshal(command, dos); @@ -60,140 +61,137 @@ } public void marshal(Object command, DataOutput os) throws IOException { - StompFrame stomp = (org.apache.activemq.transport.stomp.StompFrame) command; + StompFrame stomp = (org.apache.activemq.transport.stomp.StompFrame)command; + + StringBuffer buffer = new StringBuffer(); + buffer.append(stomp.getAction()); + buffer.append(Stomp.NEWLINE); + + // Output the headers. + for (Iterator iter = stomp.getHeaders().entrySet().iterator(); iter.hasNext();) { + Map.Entry entry = (Map.Entry)iter.next(); + buffer.append(entry.getKey()); + buffer.append(Stomp.Headers.SEPERATOR); + buffer.append(entry.getValue()); + buffer.append(Stomp.NEWLINE); + } - StringBuffer buffer = new StringBuffer(); - buffer.append(stomp.getAction()); - buffer.append(Stomp.NEWLINE); - - // Output the headers. - for (Iterator iter = stomp.getHeaders().entrySet().iterator(); iter.hasNext();) { - Map.Entry entry = (Map.Entry) iter.next(); - buffer.append(entry.getKey()); - buffer.append(Stomp.Headers.SEPERATOR); - buffer.append(entry.getValue()); - buffer.append(Stomp.NEWLINE); - } - - // Add a newline to seperate the headers from the content. - buffer.append(Stomp.NEWLINE); - - os.write(buffer.toString().getBytes("UTF-8")); - os.write(stomp.getContent()); - os.write(END_OF_FRAME); - } - + // Add a newline to seperate the headers from the content. + buffer.append(Stomp.NEWLINE); + + os.write(buffer.toString().getBytes("UTF-8")); + os.write(stomp.getContent()); + os.write(END_OF_FRAME); + } public Object unmarshal(DataInput in) throws IOException { - + try { - String action = null; - - // skip white space to next real action line - while (true) { - action = readLine(in, MAX_COMMAND_LENGTH, "The maximum command length was exceeded"); - if (action == null) { - throw new IOException("connection was closed"); - } else { - action = action.trim(); - if (action.length() > 0) { - break; - } - } - } - - // Parse the headers - HashMap headers = new HashMap(25); - while (true) { - String line = readLine(in, MAX_HEADER_LENGTH, "The maximum header length was exceeded"); - if (line != null && line.trim().length() > 0) { - - if( headers.size() > MAX_HEADERS ) - throw new ProtocolException("The maximum number of headers was exceeded", true); - - try { - int seperator_index = line.indexOf(Stomp.Headers.SEPERATOR); - String name = line.substring(0, seperator_index).trim(); - String value = line.substring(seperator_index + 1, line.length()).trim(); - headers.put(name, value); - } - catch (Exception e) { - throw new ProtocolException("Unable to parser header line [" + line + "]", true); - } - } - else { - break; - } - } - - // Read in the data part. - byte[] data = NO_DATA; - String contentLength = (String)headers.get(Stomp.Headers.CONTENT_LENGTH); - if (contentLength!=null) { - - // Bless the client, he's telling us how much data to read in. - int length; - try { - length = Integer.parseInt(contentLength.trim()); - } catch (NumberFormatException e) { - throw new ProtocolException("Specified content-length is not a valid integer", true); - } - - if( length > MAX_DATA_LENGTH ) - throw new ProtocolException("The maximum data length was exceeded", true); - - data = new byte[length]; - in.readFully(data); - - if (in.readByte() != 0) { - throw new ProtocolException(Stomp.Headers.CONTENT_LENGTH+" bytes were read and " + "there was no trailing null byte", true); - } - - } else { - - // We don't know how much to read.. data ends when we hit a 0 - byte b; - ByteArrayOutputStream baos=null; - while ((b = in.readByte()) != 0) { - - if( baos == null ) { - baos = new ByteArrayOutputStream(); - } else if( baos.size() > MAX_DATA_LENGTH ) { - throw new ProtocolException("The maximum data length was exceeded", true); - } - - baos.write(b); - } - - if( baos!=null ) { - baos.close(); - data = baos.toByteArray(); - } - - } - - return new StompFrame(action, headers, data); - - } catch (ProtocolException e) { - return new StompFrameError(e); - } + String action = null; + + // skip white space to next real action line + while (true) { + action = readLine(in, MAX_COMMAND_LENGTH, "The maximum command length was exceeded"); + if (action == null) { + throw new IOException("connection was closed"); + } else { + action = action.trim(); + if (action.length() > 0) { + break; + } + } + } + + // Parse the headers + HashMap headers = new HashMap(25); + while (true) { + String line = readLine(in, MAX_HEADER_LENGTH, "The maximum header length was exceeded"); + if (line != null && line.trim().length() > 0) { + + if (headers.size() > MAX_HEADERS) + throw new ProtocolException("The maximum number of headers was exceeded", true); + + try { + int seperator_index = line.indexOf(Stomp.Headers.SEPERATOR); + String name = line.substring(0, seperator_index).trim(); + String value = line.substring(seperator_index + 1, line.length()).trim(); + headers.put(name, value); + } catch (Exception e) { + throw new ProtocolException("Unable to parser header line [" + line + "]", true); + } + } else { + break; + } + } + + // Read in the data part. + byte[] data = NO_DATA; + String contentLength = (String)headers.get(Stomp.Headers.CONTENT_LENGTH); + if (contentLength != null) { + + // Bless the client, he's telling us how much data to read in. + int length; + try { + length = Integer.parseInt(contentLength.trim()); + } catch (NumberFormatException e) { + throw new ProtocolException("Specified content-length is not a valid integer", true); + } + + if (length > MAX_DATA_LENGTH) + throw new ProtocolException("The maximum data length was exceeded", true); + + data = new byte[length]; + in.readFully(data); + + if (in.readByte() != 0) { + throw new ProtocolException(Stomp.Headers.CONTENT_LENGTH + " bytes were read and " + "there was no trailing null byte", true); + } + + } else { + + // We don't know how much to read.. data ends when we hit a 0 + byte b; + ByteArrayOutputStream baos = null; + while ((b = in.readByte()) != 0) { + + if (baos == null) { + baos = new ByteArrayOutputStream(); + } else if (baos.size() > MAX_DATA_LENGTH) { + throw new ProtocolException("The maximum data length was exceeded", true); + } + + baos.write(b); + } + + if (baos != null) { + baos.close(); + data = baos.toByteArray(); + } + + } + + return new StompFrame(action, headers, data); + + } catch (ProtocolException e) { + return new StompFrameError(e); + } } private String readLine(DataInput in, int maxLength, String errorMessage) throws IOException { byte b; - ByteArrayOutputStream baos=new ByteArrayOutputStream(maxLength); + ByteArrayOutputStream baos = new ByteArrayOutputStream(maxLength); while ((b = in.readByte()) != '\n') { - if( baos.size() > maxLength ) - throw new ProtocolException(errorMessage, true); + if (baos.size() > maxLength) + throw new ProtocolException(errorMessage, true); baos.write(b); } baos.close(); ByteSequence sequence = baos.toByteSequence(); - return new String(sequence.getData(),sequence.getOffset(),sequence.getLength(),"UTF-8"); - } + return new String(sequence.getData(), sequence.getOffset(), sequence.getLength(), "UTF-8"); + } - public int getVersion() { + public int getVersion() { return version; } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransport.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransport.java?view=diff&rev=563982&r1=563981&r2=563982 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransport.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransport.java Wed Aug 8 11:56:59 2007 @@ -33,13 +33,13 @@ /** * A Transport class that uses SSL and client-side certificate authentication. - * - * Client-side certificate authentication must be enabled through the constructor. - * By default, this class will have the same client authentication behavior as the socket it is passed. - * This class will set ConnectionInfo's transportContext to the SSL certificates of the client. - * NOTE: Accessor method for needClientAuth was not provided on purpose. This is because needClientAuth's value must be - * set before the socket is connected. Otherwise, unexpected situations may occur. - * + * Client-side certificate authentication must be enabled through the + * constructor. By default, this class will have the same client authentication + * behavior as the socket it is passed. This class will set ConnectionInfo's + * transportContext to the SSL certificates of the client. NOTE: Accessor method + * for needClientAuth was not provided on purpose. This is because + * needClientAuth's value must be set before the socket is connected. Otherwise, + * unexpected situations may occur. */ public class SslTransport extends TcpTransport { /** @@ -47,11 +47,11 @@ * * @param wireFormat The WireFormat to be used. * @param socketFactory The socket factory to be used. Forcing SSLSockets - * for obvious reasons. + * for obvious reasons. * @param remoteLocation The remote location. * @param localLocation The local location. * @param needClientAuth If set to true, the underlying socket will need - * client certificate authentication. + * client certificate authentication. * @throws UnknownHostException If TcpTransport throws. * @throws IOException If TcpTransport throws. */ @@ -61,12 +61,10 @@ ((SSLSocket)this.socket).setNeedClientAuth(needClientAuth); } } - + /** - * Initialize from a ServerSocket. - * - * No access to needClientAuth is given since it is already set within the - * provided socket. + * Initialize from a ServerSocket. No access to needClientAuth is given + * since it is already set within the provided socket. * * @param wireFormat The WireFormat to be used. * @param socket The Socket to be used. Forcing SSL. @@ -75,31 +73,31 @@ public SslTransport(WireFormat wireFormat, SSLSocket socket) throws IOException { super(wireFormat, socket); } - + /** - * Overriding in order to add the client's certificates to ConnectionInfo Commmands. + * Overriding in order to add the client's certificates to ConnectionInfo + * Commmands. * * @param command The Command coming in. */ public void doConsume(Command command) { // The instanceof can be avoided, but that would require modifying the - // Command clas tree and that would require too much effort right - // now. - if ( command instanceof ConnectionInfo ) { + // Command clas tree and that would require too much effort right + // now. + if (command instanceof ConnectionInfo) { ConnectionInfo connectionInfo = (ConnectionInfo)command; - + SSLSocket sslSocket = (SSLSocket)this.socket; - + SSLSession sslSession = sslSocket.getSession(); - + X509Certificate[] clientCertChain; try { - clientCertChain = - (X509Certificate[]) sslSession.getPeerCertificates(); - } catch(SSLPeerUnverifiedException e) { + clientCertChain = (X509Certificate[])sslSession.getPeerCertificates(); + } catch (SSLPeerUnverifiedException e) { clientCertChain = null; } - + connectionInfo.setTransportContext(clientCertChain); } @@ -110,8 +108,7 @@ * @return pretty print of 'this' */ public String toString() { - return "ssl://"+socket.getInetAddress()+":"+socket.getPort(); + return "ssl://" + socket.getInetAddress() + ":" + socket.getPort(); } } - Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java?view=diff&rev=563982&r1=563981&r2=563982 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java Wed Aug 8 11:56:59 2007 @@ -49,10 +49,10 @@ import javax.net.ssl.TrustManager; /** - * An implementation of the TcpTransportFactory using SSL. - * - * The major contribution from this class is that it is aware of SslTransportServer and SslTransport classes. - * All Transports and TransportServers created from this factory will have their needClientAuth option set to false. + * An implementation of the TcpTransportFactory using SSL. The major + * contribution from this class is that it is aware of SslTransportServer and + * SslTransport classes. All Transports and TransportServers created from this + * factory will have their needClientAuth option set to false. * * @author sepandm@gmail.com (Sepand) * @version $Revision: $ @@ -60,17 +60,16 @@ public class SslTransportFactory extends TcpTransportFactory { // The context used to creat ssl sockets. private SSLContext sslContext = null; - + // The log this uses., private static final Log log = LogFactory.getLog(SslTransportFactory.class); - + /** * Constructor. Nothing special. - * */ public SslTransportFactory() { } - + /** * Overriding to use SslTransportServer and allow for proper reflection. */ @@ -79,31 +78,29 @@ Map options = new HashMap(URISupport.parseParamters(location)); ServerSocketFactory serverSocketFactory = createServerSocketFactory(); - SslTransportServer server = - new SslTransportServer(this, location, (SSLServerSocketFactory)serverSocketFactory); + SslTransportServer server = new SslTransportServer(this, location, (SSLServerSocketFactory)serverSocketFactory); server.setWireFormatFactory(createWireFormatFactory(options)); IntrospectionSupport.setProperties(server, options); Map transportOptions = IntrospectionSupport.extractProperties(options, "transport."); server.setTransportOption(transportOptions); server.bind(); - + return server; - } - catch (URISyntaxException e) { + } catch (URISyntaxException e) { throw IOExceptionSupport.create(e); } } - + /** * Overriding to allow for proper configuration through reflection. */ public Transport compositeConfigure(Transport transport, WireFormat format, Map options) { - - SslTransport sslTransport = (SslTransport) transport.narrow(SslTransport.class); + + SslTransport sslTransport = (SslTransport)transport.narrow(SslTransport.class); IntrospectionSupport.setProperties(sslTransport, options); - + Map socketOptions = IntrospectionSupport.extractProperties(options, "socket."); - + sslTransport.setSocketOptions(socketOptions); if (sslTransport.isTrace()) { @@ -116,14 +113,14 @@ if (format instanceof OpenWireFormat) { transport = new WireFormatNegotiator(transport, (OpenWireFormat)format, sslTransport.getMinmumWireFormatVersion()); } - + return transport; } - + /** * Overriding to use SslTransports. */ - protected Transport createTransport(URI location,WireFormat wf) throws UnknownHostException,IOException{ + protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException { URI localLocation = null; String path = location.getPath(); // see if the path is a local URI location @@ -133,18 +130,16 @@ Integer.parseInt(path.substring((localPortIndex + 1), path.length())); String localString = location.getScheme() + ":/" + path; localLocation = new URI(localString); - } - catch (Exception e) { + } catch (Exception e) { log.warn("path isn't a valid local location for SslTransport to use", e); } } SocketFactory socketFactory = createSocketFactory(); return new SslTransport(wf, (SSLSocketFactory)socketFactory, location, localLocation, false); } - + /** * Sets the key and trust managers used in constructed socket factories. - * * Passes given arguments to SSLContext.init(...). * * @param km The sources of authentication keys or null. @@ -161,36 +156,31 @@ } sslContext.init(km, tm, random); } - + /** - * Creates a new SSL ServerSocketFactory. - * - * The given factory will use user-provided key and trust managers (if the user provided them). + * Creates a new SSL ServerSocketFactory. The given factory will use + * user-provided key and trust managers (if the user provided them). * * @return Newly created (Ssl)ServerSocketFactory. */ protected ServerSocketFactory createServerSocketFactory() { if (sslContext == null) { return SSLServerSocketFactory.getDefault(); - } - else + } else return sslContext.getServerSocketFactory(); } /** - * Creates a new SSL SocketFactory. - * - * The given factory will use user-provided key and trust managers (if the user provided them). + * Creates a new SSL SocketFactory. The given factory will use user-provided + * key and trust managers (if the user provided them). * * @return Newly created (Ssl)SocketFactory. */ protected SocketFactory createSocketFactory() { - if ( sslContext == null ) { + if (sslContext == null) { return SSLSocketFactory.getDefault(); - } - else + } else return sslContext.getSocketFactory(); } - } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedInputStream.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedInputStream.java?view=diff&rev=563982&r1=563981&r2=563982 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedInputStream.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedInputStream.java Wed Aug 8 11:56:59 2007 @@ -19,107 +19,108 @@ import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; + /** * An optimized buffered input stream for Tcp * * @version $Revision: 1.1.1.1 $ */ -public class TcpBufferedInputStream extends FilterInputStream{ - private static final int DEFAULT_BUFFER_SIZE=8192; +public class TcpBufferedInputStream extends FilterInputStream { + private static final int DEFAULT_BUFFER_SIZE = 8192; protected byte internalBuffer[]; protected int count; protected int position; - public TcpBufferedInputStream(InputStream in){ - this(in,DEFAULT_BUFFER_SIZE); + public TcpBufferedInputStream(InputStream in) { + this(in, DEFAULT_BUFFER_SIZE); } - public TcpBufferedInputStream(InputStream in,int size){ + public TcpBufferedInputStream(InputStream in, int size) { super(in); - if(size<=0){ + if (size <= 0) { throw new IllegalArgumentException("Buffer size <= 0"); } - internalBuffer=new byte[size]; + internalBuffer = new byte[size]; } - private void fill() throws IOException{ - byte[] buffer=internalBuffer; - count=position=0; - int n=in.read(buffer,position,buffer.length-position); - if(n>0) - count=n+position; + private void fill() throws IOException { + byte[] buffer = internalBuffer; + count = position = 0; + int n = in.read(buffer, position, buffer.length - position); + if (n > 0) + count = n + position; } - public int read() throws IOException{ - if(position>=count){ + public int read() throws IOException { + if (position >= count) { fill(); - if(position>=count) + if (position >= count) return -1; } - return internalBuffer[position++]&0xff; + return internalBuffer[position++] & 0xff; } - private int readStream(byte[] b,int off,int len) throws IOException{ - int avail=count-position; - if(avail<=0){ - if(len>=internalBuffer.length){ - return in.read(b,off,len); + private int readStream(byte[] b, int off, int len) throws IOException { + int avail = count - position; + if (avail <= 0) { + if (len >= internalBuffer.length) { + return in.read(b, off, len); } fill(); - avail=count-position; - if(avail<=0) + avail = count - position; + if (avail <= 0) return -1; } - int cnt=(avail=len) + int n = 0; + for (;;) { + int nread = readStream(b, off + n, len - n); + if (nread <= 0) + return (n == 0) ? nread : n; + n += nread; + if (n >= len) return n; // if not closed but no bytes available, return - InputStream input=in; - if(input!=null&&input.available()<=0) + InputStream input = in; + if (input != null && input.available() <= 0) return n; } } - public long skip(long n) throws IOException{ - if(n<=0){ + public long skip(long n) throws IOException { + if (n <= 0) { return 0; } - long avail=count-position; - if(avail<=0){ + long avail = count - position; + if (avail <= 0) { return in.skip(n); } - long skipped=(avail= len) { System.arraycopy(b, off, buffer, count, len); count += len; - } - else { + } else { out.write(b, off, len); } } /** - * flush the data to the output stream - * This doesn't call flush on the underlying outputstream, because - * Tcp is particularly efficent at doing this itself .... - * + * flush the data to the output stream This doesn't call flush on the + * underlying outputstream, because Tcp is particularly efficent at doing + * this itself .... + * * @throws IOException */ public void flush() throws IOException { @@ -112,7 +110,7 @@ /** * close this stream - * + * * @throws IOException */ public void close() throws IOException { @@ -120,10 +118,9 @@ closed = true; } - /** * Checks that the stream has not been closed - * + * * @throws IOException */ private final void checkClosed() throws IOException { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?view=diff&rev=563982&r1=563981&r2=563982 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Wed Aug 8 11:56:59 2007 @@ -45,7 +45,7 @@ /** * An implementation of the {@link Transport} interface using raw tcp/ip - * + * * @version $Revision$ */ public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable { @@ -74,12 +74,11 @@ /** * Connect to a remote Node - e.g. a Broker - * + * * @param wireFormat * @param socketFactory * @param remoteLocation - * @param localLocation - - * e.g. local InetAddress and local port + * @param localLocation - e.g. local InetAddress and local port * @throws IOException * @throws UnknownHostException */ @@ -88,8 +87,7 @@ this.socketFactory = socketFactory; try { this.socket = socketFactory.createSocket(); - } - catch (SocketException e) { + } catch (SocketException e) { this.socket = null; } this.remoteLocation = remoteLocation; @@ -97,10 +95,9 @@ setDaemon(false); } - /** * Initialize from a server Socket - * + * * @param wireFormat * @param socket * @throws IOException @@ -135,28 +132,25 @@ public void run() { log.trace("TCP consumer thread starting"); try { - while (!isStopped()) { - doRun(); - } + while (!isStopped()) { + doRun(); + } } catch (IOException e) { - stoppedLatch.get().countDown(); + stoppedLatch.get().countDown(); onException(e); } finally { - stoppedLatch.get().countDown(); + stoppedLatch.get().countDown(); } } - - protected void doRun() throws IOException { - try { - Object command = readCommand(); - doConsume(command); - } - catch (SocketTimeoutException e) { - } - catch (InterruptedIOException e) { - } - } + protected void doRun() throws IOException { + try { + Object command = readCommand(); + doConsume(command); + } catch (SocketTimeoutException e) { + } catch (InterruptedIOException e) { + } + } protected Object readCommand() throws IOException { return wireFormat.unmarshal(dataIn); @@ -248,22 +242,21 @@ public void setTcpNoDelay(Boolean tcpNoDelay) { this.tcpNoDelay = tcpNoDelay; } - + /** * @return the ioBufferSize */ - public int getIoBufferSize(){ + public int getIoBufferSize() { return this.ioBufferSize; } /** * @param ioBufferSize the ioBufferSize to set */ - public void setIoBufferSize(int ioBufferSize){ - this.ioBufferSize=ioBufferSize; + public void setIoBufferSize(int ioBufferSize) { + this.ioBufferSize = ioBufferSize; } - // Implementation methods // ------------------------------------------------------------------------- protected String resolveHostName(String host) throws UnknownHostException { @@ -278,7 +271,7 @@ /** * Configures the socket for use - * + * * @param sock * @throws SocketException */ @@ -290,8 +283,7 @@ try { sock.setReceiveBufferSize(socketBufferSize); sock.setSendBufferSize(socketBufferSize); - } - catch (SocketException se) { + } catch (SocketException se) { log.warn("Cannot set socket buffer size = " + socketBufferSize); log.debug("Cannot set socket buffer size. Reason: " + se, se); } @@ -340,20 +332,17 @@ if (remoteAddress != null) { if (connectionTimeout >= 0) { socket.connect(remoteAddress, connectionTimeout); - } - else { + } else { socket.connect(remoteAddress); } } - } - else { + } else { // For SSL sockets.. you can't create an unconnected socket :( // This means the timout option are not supported either. if (localAddress != null) { socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort(), localAddress.getAddress(), localAddress.getPort()); - } - else { + } else { socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort()); } } @@ -362,7 +351,6 @@ initializeStreams(); } - protected void doStop(ServiceStopper stopper) throws Exception { if (log.isDebugEnabled()) { log.debug("Stopping transport " + this); @@ -375,18 +363,17 @@ socket.close(); } } - - + /** * Override so that stop() blocks until the run thread is no longer running. */ @Override public void stop() throws Exception { - super.stop(); - CountDownLatch countDownLatch = stoppedLatch.get(); - if( countDownLatch!=null ) { - countDownLatch.await(); - } + super.stop(); + CountDownLatch countDownLatch = stoppedLatch.get(); + if (countDownLatch != null) { + countDownLatch.await(); + } } protected void initializeStreams() throws Exception { @@ -416,7 +403,4 @@ return null; } - - - } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java?view=diff&rev=563982&r1=563981&r2=563982 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java Wed Aug 8 11:56:59 2007 @@ -42,6 +42,7 @@ public class TcpTransportFactory extends TransportFactory { private static final Log log = LogFactory.getLog(TcpTransportFactory.class); + public TransportServer doBind(String brokerId, final URI location) throws IOException { try { Map options = new HashMap(URISupport.parseParamters(location)); @@ -53,16 +54,16 @@ Map transportOptions = IntrospectionSupport.extractProperties(options, "transport."); server.setTransportOption(transportOptions); server.bind(); - + return server; - } - catch (URISyntaxException e) { + } catch (URISyntaxException e) { throw IOExceptionSupport.create(e); } } /** - * Allows subclasses of TcpTransportFactory to create custom instances of TcpTransportServer. + * Allows subclasses of TcpTransportFactory to create custom instances of + * TcpTransportServer. * * @param location * @param serverSocketFactory @@ -70,16 +71,16 @@ * @throws IOException * @throws URISyntaxException */ - protected TcpTransportServer createTcpTransportServer(final URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { - return new TcpTransportServer(this, location, serverSocketFactory); - } + protected TcpTransportServer createTcpTransportServer(final URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { + return new TcpTransportServer(this, location, serverSocketFactory); + } public Transport compositeConfigure(Transport transport, WireFormat format, Map options) { - - TcpTransport tcpTransport = (TcpTransport) transport.narrow(TcpTransport.class); + + TcpTransport tcpTransport = (TcpTransport)transport.narrow(TcpTransport.class); IntrospectionSupport.setProperties(tcpTransport, options); - - Map socketOptions = IntrospectionSupport.extractProperties(options, "socket."); + + Map socketOptions = IntrospectionSupport.extractProperties(options, "socket."); tcpTransport.setSocketOptions(socketOptions); if (tcpTransport.isTrace()) { @@ -91,10 +92,10 @@ } // Only need the WireFormatNegotiator if using openwire - if( format instanceof OpenWireFormat ) { - transport = new WireFormatNegotiator(transport, (OpenWireFormat)format, tcpTransport.getMinmumWireFormatVersion()); + if (format instanceof OpenWireFormat) { + transport = new WireFormatNegotiator(transport, (OpenWireFormat)format, tcpTransport.getMinmumWireFormatVersion()); } - + return transport; } @@ -105,9 +106,9 @@ return true; } - protected Transport createTransport(URI location,WireFormat wf) throws UnknownHostException,IOException{ - URI localLocation=null; - String path=location.getPath(); + protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException { + URI localLocation = null; + String path = location.getPath(); // see if the path is a local URI location if (path != null && path.length() > 0) { int localPortIndex = path.indexOf(':'); @@ -115,8 +116,7 @@ Integer.parseInt(path.substring((localPortIndex + 1), path.length())); String localString = location.getScheme() + ":/" + path; localLocation = new URI(localString); - } - catch (Exception e) { + } catch (Exception e) { log.warn("path isn't a valid local location for TcpTransport to use", e); } } @@ -125,19 +125,20 @@ } /** - * Allows subclasses of TcpTransportFactory to provide a create custom TcpTransport intances. + * Allows subclasses of TcpTransportFactory to provide a create custom + * TcpTransport intances. * * @param location * @param wf * @param socketFactory - * @param localLocation + * @param localLocation * @return * @throws UnknownHostException * @throws IOException */ - protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException { - return new TcpTransport(wf, socketFactory, location, localLocation); - } + protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException { + return new TcpTransport(wf, socketFactory, location, localLocation); + } protected ServerSocketFactory createServerSocketFactory() { return ServerSocketFactory.getDefault(); Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java?view=diff&rev=563982&r1=563981&r2=563982 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java Wed Aug 8 11:56:59 2007 @@ -49,7 +49,7 @@ */ public class TcpTransportServer extends TransportServerThreadSupport { - + private static final Log log = LogFactory.getLog(TcpTransportServer.class); protected ServerSocket serverSocket; protected int backlog = 5000; @@ -60,16 +60,16 @@ protected boolean trace; protected Map transportOptions; protected final ServerSocketFactory serverSocketFactory; - + public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { super(location); - this.transportFactory=transportFactory; - this.serverSocketFactory = serverSocketFactory; + this.transportFactory = transportFactory; + this.serverSocketFactory = serverSocketFactory; } public void bind() throws IOException { - URI bind = getBindLocation(); - + URI bind = getBindLocation(); + String host = bind.getHost(); host = (host == null || host.length() == 0) ? "localhost" : host; InetAddress addr = InetAddress.getByName(host); @@ -77,31 +77,29 @@ try { if (host.trim().equals("localhost") || addr.equals(InetAddress.getLocalHost())) { this.serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog); - } - else { + } else { this.serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr); } this.serverSocket.setSoTimeout(2000); - } - catch (IOException e) { + } catch (IOException e) { throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e); } try { - setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(bind.getHost()), serverSocket.getLocalPort(), bind.getPath(), - bind.getQuery(), bind.getFragment())); - } catch (URISyntaxException e) { + setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(bind.getHost()), serverSocket.getLocalPort(), bind.getPath(), bind.getQuery(), bind + .getFragment())); + } catch (URISyntaxException e) { - // it could be that the host name contains invalid characters such as _ on unix platforms + // it could be that the host name contains invalid characters such + // as _ on unix platforms // so lets try use the IP address instead try { - setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), addr.getHostAddress(), serverSocket.getLocalPort(), bind.getPath(), - bind.getQuery(), bind.getFragment())); + setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), addr.getHostAddress(), serverSocket.getLocalPort(), bind.getPath(), bind.getQuery(), bind.getFragment())); } catch (URISyntaxException e2) { throw IOExceptionSupport.create(e2); } } } - + /** * @return Returns the wireFormatFactory. */ @@ -110,8 +108,7 @@ } /** - * @param wireFormatFactory - * The wireFormatFactory to set. + * @param wireFormatFactory The wireFormatFactory to set. */ public void setWireFormatFactory(WireFormatFactory wireFormatFactory) { this.wireFormatFactory = wireFormatFactory; @@ -161,8 +158,7 @@ if (socket != null) { if (isStopped() || getAcceptListener() == null) { socket.close(); - } - else { + } else { HashMap options = new HashMap(); options.put("maxInactivityDuration", Long.valueOf(maxInactivityDuration)); options.put("minmumWireFormatVersion", Integer.valueOf(minmumWireFormatVersion)); @@ -174,13 +170,11 @@ getAcceptListener().onAccept(configuredTransport); } } - } - catch (SocketTimeoutException ste) { + } catch (SocketTimeoutException ste) { // expect this to happen - } - catch (Exception e) { + } catch (Exception e) { if (!isStopping()) { - onAcceptError(e); + onAcceptError(e); } else if (!isStopped()) { log.warn("run()", e); onAcceptError(e); @@ -190,25 +184,26 @@ } /** - * Allow derived classes to override the Transport implementation that this transport server creates. + * Allow derived classes to override the Transport implementation that this + * transport server creates. + * * @param socket * @param format * @return * @throws IOException */ - protected Transport createTransport(Socket socket, WireFormat format) throws IOException { - return new TcpTransport(format, socket); - } + protected Transport createTransport(Socket socket, WireFormat format) throws IOException { + return new TcpTransport(format, socket); + } /** * @return pretty print of this */ public String toString() { - return ""+getBindLocation(); + return "" + getBindLocation(); } /** - * * @param hostName * @return real hostName * @throws UnknownHostException Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramChannel.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramChannel.java?view=diff&rev=563982&r1=563981&r2=563982 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramChannel.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramChannel.java Wed Aug 8 11:56:59 2007 @@ -56,8 +56,8 @@ private int defaultMarshalBufferSize = 64 * 1024; public CommandDatagramChannel(UdpTransport transport, OpenWireFormat wireFormat, int datagramSize, - SocketAddress targetAddress, DatagramHeaderMarshaller headerMarshaller, DatagramChannel channel, - ByteBufferPool bufferPool) { + SocketAddress targetAddress, DatagramHeaderMarshaller headerMarshaller, + DatagramChannel channel, ByteBufferPool bufferPool) { super(transport, wireFormat, datagramSize, targetAddress, headerMarshaller); this.channel = channel; this.bufferPool = bufferPool; @@ -97,7 +97,7 @@ // the ByteBuffer to avoid object allocation and unnecessary // buffering? DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(data)); - answer = (Command) wireFormat.unmarshal(dataIn); + answer = (Command)wireFormat.unmarshal(dataIn); break; } } @@ -151,15 +151,14 @@ // lets remove the header of the partial command // which is the byte for the type and an int for the size of // the byte[] - chunkSize -= 1 // the data type - + 4 // the command ID - + 4; // the size of the partial data + + // data type + the command ID + size of the partial data + chunkSize -= 1 + 4 + 4; // the boolean flags if (bs != null) { chunkSize -= bs.marshalledSize(); - } - else { + } else { chunkSize -= 1; } @@ -176,8 +175,7 @@ if (lastFragment) { writeBuffer.put(LastPartialCommand.DATA_STRUCTURE_TYPE); - } - else { + } else { writeBuffer.put(PartialCommand.DATA_STRUCTURE_TYPE); } @@ -191,7 +189,7 @@ } writeBuffer.putInt(commandId); if (bs == null) { - writeBuffer.put((byte) 1); + writeBuffer.put((byte)1); } // size of byte array @@ -203,8 +201,7 @@ offset += chunkSize; sendWriteBuffer(commandId, address, writeBuffer, false); } - } - else { + } else { writeBuffer.put(data); sendWriteBuffer(command.getCommandId(), address, writeBuffer, false); } @@ -227,15 +224,15 @@ // Implementation methods // ------------------------------------------------------------------------- - protected void sendWriteBuffer(int commandId, SocketAddress address, ByteBuffer writeBuffer, boolean redelivery) - throws IOException { + protected void sendWriteBuffer(int commandId, SocketAddress address, ByteBuffer writeBuffer, + boolean redelivery) throws IOException { // lets put the datagram into the replay buffer first to prevent timing // issues ReplayBuffer bufferCache = getReplayBuffer(); if (bufferCache != null && !redelivery) { bufferCache.addBuffer(commandId, writeBuffer); } - + writeBuffer.flip(); if (log.isDebugEnabled()) { @@ -247,10 +244,9 @@ public void sendBuffer(int commandId, Object buffer) throws IOException { if (buffer != null) { - ByteBuffer writeBuffer = (ByteBuffer) buffer; + ByteBuffer writeBuffer = (ByteBuffer)buffer; sendWriteBuffer(commandId, getReplayAddress(), writeBuffer, true); - } - else { + } else { if (log.isWarnEnabled()) { log.warn("Request for buffer: " + commandId + " is no longer present"); } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java?view=diff&rev=563982&r1=563981&r2=563982 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java Wed Aug 8 11:56:59 2007 @@ -48,8 +48,8 @@ private Object readLock = new Object(); private Object writeLock = new Object(); - public CommandDatagramSocket(UdpTransport transport, OpenWireFormat wireFormat, int datagramSize, - SocketAddress targetAddress, DatagramHeaderMarshaller headerMarshaller, DatagramSocket channel) { + public CommandDatagramSocket(UdpTransport transport, OpenWireFormat wireFormat, int datagramSize, SocketAddress targetAddress, DatagramHeaderMarshaller headerMarshaller, + DatagramSocket channel) { super(transport, wireFormat, datagramSize, targetAddress, headerMarshaller); this.channel = channel; } @@ -73,7 +73,7 @@ DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(datagram.getData())); from = headerMarshaller.createEndpoint(datagram, dataIn); - answer = (Command) wireFormat.unmarshal(dataIn); + answer = (Command)wireFormat.unmarshal(dataIn); break; } } @@ -100,8 +100,7 @@ if (remaining(writeBuffer) >= 0) { sendWriteBuffer(address, writeBuffer, command.getCommandId()); - } - else { + } else { // lets split the command up into chunks byte[] data = writeBuffer.toByteArray(); boolean lastFragment = false; @@ -125,15 +124,14 @@ // lets remove the header of the partial command // which is the byte for the type and an int for the size of // the byte[] - chunkSize -= 1 // the data type - + 4 // the command ID - + 4; // the size of the partial data + + // data type + the command ID + size of the partial data + chunkSize -= 1 + 4 + 4; // the boolean flags if (bs != null) { chunkSize -= bs.marshalledSize(); - } - else { + } else { chunkSize -= 1; } @@ -150,8 +148,7 @@ if (lastFragment) { dataOut.write(LastPartialCommand.DATA_STRUCTURE_TYPE); - } - else { + } else { dataOut.write(PartialCommand.DATA_STRUCTURE_TYPE); } @@ -165,7 +162,7 @@ } dataOut.writeInt(commandId); if (bs == null) { - dataOut.write((byte) 1); + dataOut.write((byte)1); } // size of byte array @@ -191,14 +188,12 @@ // Implementation methods // ------------------------------------------------------------------------- - protected void sendWriteBuffer(SocketAddress address, ByteArrayOutputStream writeBuffer, int commandId) - throws IOException { + protected void sendWriteBuffer(SocketAddress address, ByteArrayOutputStream writeBuffer, int commandId) throws IOException { byte[] data = writeBuffer.toByteArray(); sendWriteBuffer(commandId, address, data, false); } - protected void sendWriteBuffer(int commandId, SocketAddress address, byte[] data, boolean redelivery) - throws IOException { + protected void sendWriteBuffer(int commandId, SocketAddress address, byte[] data, boolean redelivery) throws IOException { // lets put the datagram into the replay buffer first to prevent timing // issues ReplayBuffer bufferCache = getReplayBuffer(); @@ -216,10 +211,9 @@ public void sendBuffer(int commandId, Object buffer) throws IOException { if (buffer != null) { - byte[] data = (byte[]) buffer; + byte[] data = (byte[])buffer; sendWriteBuffer(commandId, replayAddress, data, true); - } - else { + } else { if (log.isWarnEnabled()) { log.warn("Request for buffer: " + commandId + " is no longer present"); } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java?view=diff&rev=563982&r1=563981&r2=563982 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java Wed Aug 8 11:56:59 2007 @@ -52,8 +52,8 @@ public class UdpTransport extends TransportThreadSupport implements Transport, Service, Runnable { private static final Log log = LogFactory.getLog(UdpTransport.class); - private static final int MAX_BIND_ATTEMPTS = 50; - private static final long BIND_ATTEMPT_DELAY = 100; + private static final int MAX_BIND_ATTEMPTS = 50; + private static final long BIND_ATTEMPT_DELAY = 100; private CommandChannel commandChannel; private OpenWireFormat wireFormat; @@ -98,12 +98,11 @@ this.description = getProtocolName() + "Server@"; } - /** * Creates a replayer for working with the reliable transport */ public Replayer createReplayer() throws IOException { - if (replayEnabled ) { + if (replayEnabled) { return getCommandChannel(); } return null; @@ -124,7 +123,7 @@ log.debug("Sending oneway from: " + this + " to target: " + targetAddress + " command: " + command); } checkStarted(); - commandChannel.write((Command) command, address); + commandChannel.write((Command)command, address); } /** @@ -133,8 +132,7 @@ public String toString() { if (description != null) { return description + port; - } - else { + } else { return getProtocolUriScheme() + targetAddress + "@" + port; } } @@ -148,47 +146,38 @@ try { Command command = commandChannel.read(); doConsume(command); - } - catch (AsynchronousCloseException e) { + } catch (AsynchronousCloseException e) { // DatagramChannel closed try { stop(); - } - catch (Exception e2) { + } catch (Exception e2) { log.warn("Caught in: " + this + " while closing: " + e2 + ". Now Closed", e2); } - } - catch (SocketException e) { + } catch (SocketException e) { // DatagramSocket closed log.debug("Socket closed: " + e, e); try { stop(); - } - catch (Exception e2) { + } catch (Exception e2) { log.warn("Caught in: " + this + " while closing: " + e2 + ". Now Closed", e2); } - } - catch (EOFException e) { + } catch (EOFException e) { // DataInputStream closed log.debug("Socket closed: " + e, e); try { stop(); - } - catch (Exception e2) { + } catch (Exception e2) { log.warn("Caught in: " + this + " while closing: " + e2 + ". Now Closed", e2); } - } - catch (Exception e) { + } catch (Exception e) { try { stop(); - } - catch (Exception e2) { + } catch (Exception e2) { log.warn("Caught in: " + this + " while closing: " + e2 + ". Now Closed", e2); } if (e instanceof IOException) { - onException((IOException) e); - } - else { + onException((IOException)e); + } else { log.error("Caught: " + e, e); e.printStackTrace(); } @@ -204,7 +193,7 @@ */ public void setTargetEndpoint(Endpoint newTarget) { if (newTarget instanceof DatagramEndpoint) { - DatagramEndpoint endpoint = (DatagramEndpoint) newTarget; + DatagramEndpoint endpoint = (DatagramEndpoint)newTarget; SocketAddress address = endpoint.getAddress(); if (address != null) { if (originalTargetAddress == null) { @@ -305,14 +294,15 @@ public void setSequenceGenerator(IntSequenceGenerator sequenceGenerator) { this.sequenceGenerator = sequenceGenerator; } - + public boolean isReplayEnabled() { return replayEnabled; } /** - * Sets whether or not replay should be enabled when using the reliable transport. - * i.e. should we maintain a buffer of messages that can be replayed? + * Sets whether or not replay should be enabled when using the reliable + * transport. i.e. should we maintain a buffer of messages that can be + * replayed? */ public void setReplayEnabled(boolean replayEnabled) { this.replayEnabled = replayEnabled; @@ -328,7 +318,7 @@ public void setBufferPool(ByteBufferPool bufferPool) { this.bufferPool = bufferPool; } - + public ReplayBuffer getReplayBuffer() { return replayBuffer; } @@ -338,7 +328,6 @@ getCommandChannel().setReplayBuffer(replayBuffer); } - // Implementation methods // ------------------------------------------------------------------------- @@ -391,26 +380,28 @@ if (log.isDebugEnabled()) { log.debug("Binding to address: " + localAddress); } - + // - // We have noticed that on some platfoms like linux, after you close down - // a previously bound socket, it can take a little while before we can bind it again. + // We have noticed that on some platfoms like linux, after you close + // down + // a previously bound socket, it can take a little while before we can + // bind it again. // - for(int i=0; i < MAX_BIND_ATTEMPTS; i++){ - try { - socket.bind(localAddress); - return; - } catch (BindException e) { - if ( i+1 == MAX_BIND_ATTEMPTS ) - throw e; - try { - Thread.sleep(BIND_ATTEMPT_DELAY); - } catch (InterruptedException e1) { + for (int i = 0; i < MAX_BIND_ATTEMPTS; i++) { + try { + socket.bind(localAddress); + return; + } catch (BindException e) { + if (i + 1 == MAX_BIND_ATTEMPTS) + throw e; + try { + Thread.sleep(BIND_ATTEMPT_DELAY); + } catch (InterruptedException e1) { Thread.currentThread().interrupt(); - throw e; - } - } - } + throw e; + } + } + } } @@ -457,17 +448,17 @@ } public InetSocketAddress getLocalSocketAddress() { - if( channel==null ) { + if (channel == null) { return null; } else { return (InetSocketAddress)channel.socket().getLocalSocketAddress(); } } - public String getRemoteAddress() { - if(targetAddress != null){ - return "" + targetAddress; - } - return null; - } + public String getRemoteAddress() { + if (targetAddress != null) { + return "" + targetAddress; + } + return null; + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java?view=diff&rev=563982&r1=563981&r2=563982 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java Wed Aug 8 11:56:59 2007 @@ -41,178 +41,176 @@ * * @version $Revision$ */ -public class VMTransport implements Transport,Task{ +public class VMTransport implements Transport, Task { - private static final Log log=LogFactory.getLog(VMTransport.class); - private static final AtomicLong nextId=new AtomicLong(0); - private static final TaskRunnerFactory taskRunnerFactory=new TaskRunnerFactory("VMTransport",Thread.NORM_PRIORITY, - true,1000); + private static final Log log = LogFactory.getLog(VMTransport.class); + private static final AtomicLong nextId = new AtomicLong(0); + private static final TaskRunnerFactory taskRunnerFactory = new TaskRunnerFactory("VMTransport", Thread.NORM_PRIORITY, true, 1000); protected VMTransport peer; protected TransportListener transportListener; protected boolean disposed; protected boolean marshal; protected boolean network; - protected boolean async=true; - protected int asyncQueueDepth=2000; - protected LinkedBlockingQueue messageQueue=null; + protected boolean async = true; + protected int asyncQueueDepth = 2000; + protected LinkedBlockingQueue messageQueue = null; protected boolean started; protected final URI location; protected final long id; private TaskRunner taskRunner; - private final Object mutex=new Object(); + private final Object mutex = new Object(); - public VMTransport(URI location){ - this.location=location; - this.id=nextId.getAndIncrement(); + public VMTransport(URI location) { + this.location = location; + this.id = nextId.getAndIncrement(); } - public VMTransport getPeer(){ - synchronized(mutex){ + public VMTransport getPeer() { + synchronized (mutex) { return peer; } } - public void setPeer(VMTransport peer){ - synchronized(mutex){ - this.peer=peer; + public void setPeer(VMTransport peer) { + synchronized (mutex) { + this.peer = peer; } } - public void oneway(Object command) throws IOException{ - if(disposed){ + public void oneway(Object command) throws IOException { + if (disposed) { throw new TransportDisposedIOException("Transport disposed."); } - if(peer==null) + if (peer == null) throw new IOException("Peer not connected."); - TransportListener tl=null; - synchronized(peer.mutex) { - if( peer.disposed ) { - throw new TransportDisposedIOException("Peer ("+peer.toString()+") disposed."); - } - if( peer.started ) { - if(peer.async){ + TransportListener tl = null; + synchronized (peer.mutex) { + if (peer.disposed) { + throw new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed."); + } + if (peer.started) { + if (peer.async) { peer.enqueue(command); - peer.wakeup(); + peer.wakeup(); } else { - tl = peer.transportListener; + tl = peer.transportListener; } - } else { - peer.enqueue(command); - } - } - - if( tl!=null ) { - tl.onCommand(command); - } - - } - - private void enqueue(Object command) throws IOException { - try{ - getMessageQueue().put(command); - }catch(final InterruptedException e){ - throw IOExceptionSupport.create(e); - } - } + } else { + peer.enqueue(command); + } + } + + if (tl != null) { + tl.onCommand(command); + } + + } + + private void enqueue(Object command) throws IOException { + try { + getMessageQueue().put(command); + } catch (final InterruptedException e) { + throw IOExceptionSupport.create(e); + } + } - public FutureResponse asyncRequest(Object command,ResponseCallback responseCallback) throws IOException{ + public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException { throw new AssertionError("Unsupported Method"); } - public Object request(Object command) throws IOException{ + public Object request(Object command) throws IOException { throw new AssertionError("Unsupported Method"); } - public Object request(Object command,int timeout) throws IOException{ + public Object request(Object command, int timeout) throws IOException { throw new AssertionError("Unsupported Method"); } - public TransportListener getTransportListener(){ - synchronized(mutex){ + public TransportListener getTransportListener() { + synchronized (mutex) { return transportListener; } } - public void setTransportListener(TransportListener commandListener){ - synchronized(mutex){ - this.transportListener=commandListener; + public void setTransportListener(TransportListener commandListener) { + synchronized (mutex) { + this.transportListener = commandListener; wakeup(); } } private LinkedBlockingQueue getMessageQueue() { - synchronized(mutex) { - if( messageQueue==null ) { - messageQueue=new LinkedBlockingQueue(this.asyncQueueDepth); - } - return messageQueue; - } - } - - - public void start() throws Exception{ - if(transportListener==null) + synchronized (mutex) { + if (messageQueue == null) { + messageQueue = new LinkedBlockingQueue(this.asyncQueueDepth); + } + return messageQueue; + } + } + + public void start() throws Exception { + if (transportListener == null) throw new IOException("TransportListener not set."); - - synchronized(mutex) { - if( messageQueue!=null ) { - Object command; - while( (command = messageQueue.poll()) !=null ) { - transportListener.onCommand(command); - } - } - started = true; + + synchronized (mutex) { + if (messageQueue != null) { + Object command; + while ((command = messageQueue.poll()) != null) { + transportListener.onCommand(command); + } + } + started = true; wakeup(); } } - public void stop() throws Exception{ - TaskRunner tr=null; - synchronized(mutex) { - if(!disposed){ - started=false; - disposed=true; - if(taskRunner!=null){ - tr = taskRunner; - taskRunner=null; + public void stop() throws Exception { + TaskRunner tr = null; + synchronized (mutex) { + if (!disposed) { + started = false; + disposed = true; + if (taskRunner != null) { + tr = taskRunner; + taskRunner = null; } } } - if( tr !=null ) { - tr.shutdown(1000); - } + if (tr != null) { + tr.shutdown(1000); + } } - public Object narrow(Class target){ - if(target.isAssignableFrom(getClass())){ + public Object narrow(Class target) { + if (target.isAssignableFrom(getClass())) { return this; } return null; } - public boolean isMarshal(){ + public boolean isMarshal() { return marshal; } - public void setMarshal(boolean marshal){ - this.marshal=marshal; + public void setMarshal(boolean marshal) { + this.marshal = marshal; } - public boolean isNetwork(){ + public boolean isNetwork() { return network; } - public void setNetwork(boolean network){ - this.network=network; + public void setNetwork(boolean network) { + this.network = network; } - public String toString(){ - return location+"#"+id; + public String toString() { + return location + "#" + id; } - public String getRemoteAddress(){ - if(peer!=null){ + public String getRemoteAddress() { + if (peer != null) { return peer.toString(); } return null; @@ -221,68 +219,68 @@ /** * @see org.apache.activemq.thread.Task#iterate() */ - public boolean iterate(){ + public boolean iterate() { final TransportListener tl; - synchronized(mutex){ - tl = transportListener; - if( !started || disposed || tl==null ) - return false; + synchronized (mutex) { + tl = transportListener; + if (!started || disposed || tl == null) + return false; } - + LinkedBlockingQueue mq = getMessageQueue(); - final Command command = (Command)mq.poll(); - if( command!=null ) { + final Command command = (Command)mq.poll(); + if (command != null) { tl.onCommand(command); return !mq.isEmpty(); } else { - return false; - } + return false; + } } /** * @return the async */ - public boolean isAsync(){ + public boolean isAsync() { return async; } /** * @param async the async to set */ - public void setAsync(boolean async){ - this.async=async; + public void setAsync(boolean async) { + this.async = async; } /** * @return the asyncQueueDepth */ - public int getAsyncQueueDepth(){ + public int getAsyncQueueDepth() { return asyncQueueDepth; } /** * @param asyncQueueDepth the asyncQueueDepth to set */ - public void setAsyncQueueDepth(int asyncQueueDepth){ - this.asyncQueueDepth=asyncQueueDepth; + public void setAsyncQueueDepth(int asyncQueueDepth) { + this.asyncQueueDepth = asyncQueueDepth; } - protected void wakeup(){ - if(async){ - synchronized(mutex){ - if(taskRunner==null){ - taskRunner=taskRunnerFactory.createTaskRunner(this,"VMTransport: "+toString()); + protected void wakeup() { + if (async) { + synchronized (mutex) { + if (taskRunner == null) { + taskRunner = taskRunnerFactory.createTaskRunner(this, "VMTransport: " + toString()); } } - try{ + try { taskRunner.wakeup(); - }catch(InterruptedException e){ + } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } - public boolean isFaultTolerant(){ + public boolean isFaultTolerant() { return false; } }