Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 10814 invoked from network); 18 May 2010 15:21:01 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 18 May 2010 15:21:01 -0000 Received: (qmail 62819 invoked by uid 500); 18 May 2010 15:21:01 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 62759 invoked by uid 500); 18 May 2010 15:21:01 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 62747 invoked by uid 99); 18 May 2010 15:21:01 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 18 May 2010 15:21:01 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 18 May 2010 15:20:57 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 7BD0F238890A; Tue, 18 May 2010 15:20:35 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r945692 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/transport/ main/java/org/apache/activemq/transport/nio/ main/java/org/apache/activemq/transport/stomp/ main/java/org/apache/activemq/transport/tcp/ test/java/org/... Date: Tue, 18 May 2010 15:20:34 -0000 To: commits@activemq.apache.org From: gtully@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100518152035.7BD0F238890A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: gtully Date: Tue May 18 15:20:33 2010 New Revision: 945692 URL: http://svn.apache.org/viewvc?rev=945692&view=rev Log: resolve https://issues.apache.org/activemq/browse/AMQ-2737 - have nio work with soWritetTimeout filter Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedOutputStream.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTestSupport.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java?rev=945692&r1=945691&r2=945692&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java Tue May 18 15:20:33 2010 @@ -26,6 +26,7 @@ import java.util.concurrent.locks.Condit import java.util.concurrent.locks.ReentrantLock; import org.apache.activemq.transport.tcp.TcpBufferedOutputStream; +import org.apache.activemq.transport.tcp.TimeStampStream; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -97,8 +98,8 @@ public class WriteTimeoutFilter extends } - protected TcpBufferedOutputStream getWriter() { - return next.narrow(TcpBufferedOutputStream.class); + protected TimeStampStream getWriter() { + return next.narrow(TimeStampStream.class); } protected Socket getSocket() { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java?rev=945692&r1=945691&r2=945692&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java Tue May 18 15:20:33 2010 @@ -23,13 +23,15 @@ import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.channels.WritableByteChannel; +import org.apache.activemq.transport.tcp.TimeStampStream; + /** * An optimized buffered outputstream for Tcp * * @version $Revision: 1.1.1.1 $ */ -public class NIOOutputStream extends OutputStream { +public class NIOOutputStream extends OutputStream implements TimeStampStream { private static final int BUFFER_SIZE = 8192; @@ -39,6 +41,7 @@ public class NIOOutputStream extends Out private int count; private boolean closed; + private volatile long writeTimestamp = -1;//concurrent reads of this value /** * Constructor @@ -149,31 +152,51 @@ public class NIOOutputStream extends Out int remaining = data.remaining(); int lastRemaining = remaining - 1; long delay = 1; - while (remaining > 0) { - - // We may need to do a little bit of sleeping to avoid a busy loop. - // Slow down if no data was written out.. - if (remaining == lastRemaining) { - try { - // Use exponential rollback to increase sleep time. - Thread.sleep(delay); - delay *= 2; - if (delay > 1000) { - delay = 1000; + try { + writeTimestamp = System.currentTimeMillis(); + while (remaining > 0) { + + // We may need to do a little bit of sleeping to avoid a busy loop. + // Slow down if no data was written out.. + if (remaining == lastRemaining) { + try { + // Use exponential rollback to increase sleep time. + Thread.sleep(delay); + delay *= 2; + if (delay > 1000) { + delay = 1000; + } + } catch (InterruptedException e) { + throw new InterruptedIOException(); } - } catch (InterruptedException e) { - throw new InterruptedIOException(); + } else { + delay = 1; } - } else { - delay = 1; - } - lastRemaining = remaining; + lastRemaining = remaining; - // Since the write is non-blocking, all the data may not have been - // written. - out.write(data); - remaining = data.remaining(); + // Since the write is non-blocking, all the data may not have been + // written. + out.write(data); + remaining = data.remaining(); + } + } finally { + writeTimestamp = -1; } } + + + /* (non-Javadoc) + * @see org.apache.activemq.transport.tcp.TimeStampStream#isWriting() + */ + public boolean isWriting() { + return writeTimestamp > 0; + } + + /* (non-Javadoc) + * @see org.apache.activemq.transport.tcp.TimeStampStream#getWriteTimestamp() + */ + public long getWriteTimestamp() { + return writeTimestamp; + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java?rev=945692&r1=945691&r2=945692&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java Tue May 18 15:20:33 2010 @@ -83,8 +83,9 @@ public class NIOTransport extends TcpTra currentBuffer = inputBuffer; nextFrameSize = -1; currentBuffer.limit(4); - this.dataOut = new DataOutputStream(new NIOOutputStream(channel, 16 * 1024)); - + NIOOutputStream outPutStream = new NIOOutputStream(channel, 16 * 1024); + this.dataOut = new DataOutputStream(outPutStream); + this.buffOut = outPutStream; } private void serviceRead() { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java?rev=945692&r1=945691&r2=945692&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java Tue May 18 15:20:33 2010 @@ -82,19 +82,26 @@ public class StompConnection { throw new IOException("socket closed."); } else if (c == 0) { c = is.read(); - if (c != '\n') { - throw new IOException("Expecting stomp frame to terminate with \0\n"); + if (c == '\n') { + // end of frame + return stringFromBuffer(inputBuffer); + } else { + inputBuffer.write(0); + inputBuffer.write(c); } - byte[] ba = inputBuffer.toByteArray(); - inputBuffer.reset(); - return new String(ba, "UTF-8"); } else { inputBuffer.write(c); } } } - public Socket getStompSocket() { + private String stringFromBuffer(ByteArrayOutputStream inputBuffer) throws Exception { + byte[] ba = inputBuffer.toByteArray(); + inputBuffer.reset(); + return new String(ba, "UTF-8"); + } + + public Socket getStompSocket() { return stompSocket; } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java?rev=945692&r1=945691&r2=945692&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java Tue May 18 15:20:33 2010 @@ -83,7 +83,9 @@ public class StompNIOTransport extends T }); inputBuffer = ByteBuffer.allocate(8 * 1024); - this.dataOut = new DataOutputStream(new NIOOutputStream(channel, 8 * 1024)); + NIOOutputStream outPutStream = new NIOOutputStream(channel, 8 * 1024); + this.dataOut = new DataOutputStream(outPutStream); + this.buffOut = outPutStream; } private void serviceRead() { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedOutputStream.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedOutputStream.java?rev=945692&r1=945691&r2=945692&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedOutputStream.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedOutputStream.java Tue May 18 15:20:33 2010 @@ -27,7 +27,7 @@ import java.io.OutputStream; * @version $Revision: 1.1.1.1 $ */ -public class TcpBufferedOutputStream extends FilterOutputStream { +public class TcpBufferedOutputStream extends FilterOutputStream implements TimeStampStream { private static final int BUFFER_SIZE = 8192; private byte[] buffer; private int bufferlen; @@ -129,10 +129,16 @@ public class TcpBufferedOutputStream ext super.close(); } + /* (non-Javadoc) + * @see org.apache.activemq.transport.tcp.TimeStampStream#isWriting() + */ public boolean isWriting() { return writeTimestamp > 0; } + /* (non-Javadoc) + * @see org.apache.activemq.transport.tcp.TimeStampStream#getWriteTimestamp() + */ public long getWriteTimestamp() { return writeTimestamp; } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=945692&r1=945691&r2=945692&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Tue May 18 15:20:33 2010 @@ -67,7 +67,7 @@ public class TcpTransport extends Transp protected Socket socket; protected DataOutputStream dataOut; protected DataInputStream dataIn; - protected TcpBufferedOutputStream buffOut = null; + protected TimeStampStream buffOut = null; /** * The Traffic Class to be set on the socket. */ @@ -576,8 +576,9 @@ public class TcpTransport extends Transp } }; this.dataIn = new DataInputStream(buffIn); - buffOut = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize); - this.dataOut = new DataOutputStream(buffOut); + TcpBufferedOutputStream outputStream = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize); + this.dataOut = new DataOutputStream(outputStream); + this.buffOut = outputStream; } protected void closeStreams() throws IOException { @@ -604,7 +605,7 @@ public class TcpTransport extends Transp public T narrow(Class target) { if (target == Socket.class) { return target.cast(socket); - } else if ( target == TcpBufferedOutputStream.class) { + } else if ( target == TimeStampStream.class) { return target.cast(buffOut); } return super.narrow(target); Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTestSupport.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTestSupport.java?rev=945692&r1=945691&r2=945692&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTestSupport.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTestSupport.java Tue May 18 15:20:33 2010 @@ -47,6 +47,7 @@ public class JmsTestSupport extends Comb static final private AtomicLong TEST_COUNTER = new AtomicLong(); public String userName; public String password; + public String messageTextPrefix = ""; protected ConnectionFactory factory; protected ActiveMQConnection connection; @@ -96,7 +97,7 @@ public class JmsTestSupport extends Comb protected void sendMessages(Session session, Destination destination, int count) throws JMSException { MessageProducer producer = session.createProducer(destination); for (int i = 0; i < count; i++) { - producer.send(session.createTextMessage("" + i)); + producer.send(session.createTextMessage(messageTextPrefix + i)); } producer.close(); } Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java?rev=945692&r1=945691&r2=945692&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java Tue May 18 15:20:33 2010 @@ -20,6 +20,7 @@ package org.apache.activemq.util; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketException; @@ -38,10 +39,11 @@ public class SocketProxy { private static final transient Log LOG = LogFactory.getLog(SocketProxy.class); - public static final int ACCEPT_TIMEOUT_MILLIS = 1000; + public static final int ACCEPT_TIMEOUT_MILLIS = 100; private URI proxyUrl; private URI target; + private Acceptor acceptor; private ServerSocket serverSocket; @@ -49,6 +51,11 @@ public class SocketProxy { private int listenPort = 0; + private int receiveBufferSize = -1; + + public SocketProxy() throws Exception { + } + public SocketProxy(URI uri) throws Exception { this(0, uri); } @@ -59,12 +66,24 @@ public class SocketProxy { open(); } - protected void open() throws Exception { + public void setReceiveBufferSize(int receiveBufferSize) { + this.receiveBufferSize = receiveBufferSize; + } + + public void setTarget(URI tcpBrokerUri) { + target = tcpBrokerUri; + } + + public void open() throws Exception { + serverSocket = new ServerSocket(); + if (receiveBufferSize > 0) { + serverSocket.setReceiveBufferSize(receiveBufferSize); + } if (proxyUrl == null) { - serverSocket = new ServerSocket(listenPort); + serverSocket.bind(new InetSocketAddress(listenPort)); proxyUrl = urlFromSocket(target, serverSocket); } else { - serverSocket = new ServerSocket(proxyUrl.getPort()); + serverSocket.bind(new InetSocketAddress(proxyUrl.getPort())); } acceptor = new Acceptor(serverSocket, target); new Thread(null, acceptor, "SocketProxy-Acceptor-" + serverSocket.getLocalPort()).start(); @@ -151,9 +170,13 @@ public class SocketProxy { public Connection(Socket socket, URI target) throws Exception { receiveSocket = socket; - sendSocket = new Socket(target.getHost(), target.getPort()); + sendSocket = new Socket(); + if (receiveBufferSize > 0) { + sendSocket.setReceiveBufferSize(receiveBufferSize); + } + sendSocket.connect(new InetSocketAddress(target.getHost(), target.getPort())); linkWithThreads(receiveSocket, sendSocket); - LOG.info("proxy connection " + sendSocket); + LOG.info("proxy connection " + sendSocket + ", receiveBufferSize=" + sendSocket.getReceiveBufferSize()); } public void goOn() { @@ -210,6 +233,7 @@ public class SocketProxy { while (true) { int len = in.read(buf); if (len == -1) { + LOG.debug("read eof from:" + src); break; } pause.get().await(); @@ -259,7 +283,12 @@ public class SocketProxy { pause.get().await(); try { Socket source = socket.accept(); - LOG.info("accepted " + source); + LOG.info("accepted " + source + ", receiveBufferSize:" + source.getReceiveBufferSize()); + pause.get().await(); + if (receiveBufferSize > 0) { + source.setReceiveBufferSize(receiveBufferSize); + } + LOG.info("accepted " + source + ", receiveBufferSize:" + source.getReceiveBufferSize()); synchronized(connections) { connections.add(new Connection(source, target)); }