Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 35898 invoked from network); 7 Jul 2010 03:46:55 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 7 Jul 2010 03:46:55 -0000 Received: (qmail 33597 invoked by uid 500); 7 Jul 2010 03:46:55 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 33538 invoked by uid 500); 7 Jul 2010 03:46:54 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 33531 invoked by uid 99); 7 Jul 2010 03:46:53 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Jul 2010 03:46:53 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Jul 2010 03:46:49 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id A6D362388A8B; Wed, 7 Jul 2010 03:45:26 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r961078 - in /activemq/sandbox/activemq-apollo-actor: activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java activemq-util/src/main/java/org/apache/activemq/util/buffer/DataByteArrayOutputStream.java Date: Wed, 07 Jul 2010 03:45:26 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100707034526.A6D362388A8B@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: chirino Date: Wed Jul 7 03:45:26 2010 New Revision: 961078 URL: http://svn.apache.org/viewvc?rev=961078&view=rev Log: optimizing tcp transport.. it's now on par /w the perf in the hawtdispatch stomp example Modified: activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/util/buffer/DataByteArrayOutputStream.java Modified: activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=961078&r1=961077&r2=961078&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Wed Jul 7 03:45:26 2010 @@ -20,6 +20,7 @@ import org.apache.activemq.transport.Tra import org.apache.activemq.transport.TransportListener; import org.apache.activemq.util.buffer.Buffer; import org.apache.activemq.util.buffer.ByteArrayOutputStream; +import org.apache.activemq.util.buffer.DataByteArrayOutputStream; import org.apache.activemq.wireformat.WireFormat; import org.fusesource.hawtdispatch.Dispatch; import org.fusesource.hawtdispatch.DispatchQueue; @@ -75,23 +76,22 @@ public class TcpTransport implements Tra private DispatchSource readSource; private DispatchSource writeSource; + int bufferSize = 1024*64; + final LinkedList outbound = new LinkedList(); - int outboundSize = 0; - int maxOutbound = 1024 * 32; - ByteBuffer outbound_frame; + DataByteArrayOutputStream next_outbound_buffer; + ByteBuffer outbound_buffer; protected boolean useLocalHost = true; - - int READ_BUFFFER_SIZE = 1024 * 32; - ByteBuffer readBuffer = ByteBuffer.allocate(1024 * 32); + ByteBuffer readBuffer = ByteBuffer.allocate(bufferSize); static final class OneWay { - final Buffer buffer; + final Object command; final Retained retained; - public OneWay(Buffer buffer, Retained retained) { + public OneWay(Object command, Retained retained) { + this.command = command; this.retained = retained; - this.buffer = buffer; } } @@ -140,6 +140,11 @@ public class TcpTransport implements Tra channel = SocketChannel.open(); } channel.configureBlocking(false); + channel.socket().setSendBufferSize(bufferSize); + channel.socket().setReceiveBufferSize(bufferSize); + next_outbound_buffer = new DataByteArrayOutputStream(bufferSize); + outbound_buffer = ByteBuffer.allocate(0); + if (socketState == CONNECTING) { if (localLocation != null) { @@ -186,8 +191,8 @@ public class TcpTransport implements Tra private void fireConnected() { try { - channel.socket().setSendBufferSize(maxOutbound); - channel.socket().setReceiveBufferSize(maxOutbound); + channel.socket().setSendBufferSize(bufferSize); + channel.socket().setReceiveBufferSize(bufferSize); } catch (SocketException e) { } @@ -235,7 +240,7 @@ public class TcpTransport implements Tra public boolean isFull() { - return outboundSize >= maxOutbound; + return next_outbound_buffer.size() >= bufferSize>>2; } public void oneway(Object command, Retained retained) { @@ -244,35 +249,29 @@ public class TcpTransport implements Tra if (socketState != CONNECTED) { throw new IOException("Not connected."); } + if (transportState != RUNNING) { + throw new IOException("Not running."); + } } catch (IOException e) { listener.onTransportFailure(e); } - // Marshall the command. - Buffer buffer = null; - try { - buffer = wireformat.marshal(command); - } catch (IOException e) { - listener.onTransportFailure(e); - return; - } - - OneWay oneway; + boolean wasEmpty = next_outbound_buffer.size()==0; if (retained!=null && isFull() ) { // retaining blocks the sender it is released. retained.retain(); - oneway = new OneWay(buffer, retained); + outbound.add(new OneWay(command, retained)); } else { - oneway = new OneWay(buffer, null); + try { + wireformat.marshal(command, next_outbound_buffer); + } catch (IOException e) { + listener.onTransportFailure(e); + } + if ( outbound_buffer.remaining()==0 ) { + writeSource.resume(); + } } - outbound.add(oneway); - outboundSize += buffer.length; - // wait for write ready events if this write - // cannot be drained. - if (outbound.size() == 1 && !drainOutbound()) { - writeSource.resume(); - } } /** @@ -284,56 +283,49 @@ public class TcpTransport implements Tra while (socketState == CONNECTED) { // if we have a pending write that is being sent over the socket... - if (outbound_frame != null) { - - channel.write(outbound_frame); - if (outbound_frame.remaining() != 0) { + if (outbound_buffer.remaining()!=0) { + channel.write(outbound_buffer); + if (outbound_buffer.remaining() != 0) { return false; - } else { - outbound_frame = null; } - } else { - - // marshall all the available frames.. - OneWay oneWay = outbound.poll(); - - int size = 0; - ArrayList buffers = new ArrayList(outbound.size()); - while (oneWay != null) { - size+=oneWay.buffer.length; - buffers.add(oneWay.buffer); - if (oneWay.retained != null) { - oneWay.retained.release(); - } - if (size < maxOutbound) { - oneWay = outbound.poll(); - } else { - oneWay = null; + if( next_outbound_buffer.size()!=0) { + // size of next buffer is based on how much was used in the previous buffer. + int prev_size = Math.min(Math.max(outbound_buffer.position()+512, 512), bufferSize); + outbound_buffer = next_outbound_buffer.toBuffer().toByteBuffer(); + next_outbound_buffer = new DataByteArrayOutputStream(prev_size); + } else { + // marshall all the available frames.. + OneWay oneWay = outbound.poll(); + while (oneWay != null) { + try { + wireformat.marshal(oneWay.command, next_outbound_buffer); + } catch (IOException e) { + listener.onTransportFailure(e); + } + if (oneWay.retained != null) { + oneWay.retained.release(); + } + if ( isFull() ) { + oneWay = null; + } else { + oneWay = outbound.poll(); + } } - } - if (size == 0) { - // the source is now drained... - return true; - } else { - // Make the write just one big buffer. - outboundSize -= size; - ByteArrayOutputStream buffer = new ByteArrayOutputStream(size); - for (Buffer b : buffers) { - buffer.write(b); + if (next_outbound_buffer.size() == 0) { + // the source is now drained... + return true; } - outbound_frame = buffer.toBuffer().toByteBuffer(); } } - } } catch (IOException e) { listener.onTransportFailure(e); } - return outbound.isEmpty() && outbound_frame == null; + return outbound.isEmpty() && outbound_buffer == null; } @@ -350,7 +342,7 @@ public class TcpTransport implements Tra if (readBuffer.remaining() == 0) { // double the capacity size if needed... - int new_capacity = unmarshalSession.getStartPos() != 0 ? READ_BUFFFER_SIZE : readBuffer.capacity() << 2; + int new_capacity = unmarshalSession.getStartPos() != 0 ? bufferSize : readBuffer.capacity() << 2; byte[] new_buffer = new byte[new_capacity]; // If there was un-consummed data.. move it to the start of the new buffer. Modified: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/util/buffer/DataByteArrayOutputStream.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/util/buffer/DataByteArrayOutputStream.java?rev=961078&r1=961077&r2=961078&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/util/buffer/DataByteArrayOutputStream.java (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/util/buffer/DataByteArrayOutputStream.java Wed Jul 7 03:45:26 2010 @@ -39,11 +39,18 @@ public class DataByteArrayOutputStream e * @exception IllegalArgumentException if size is negative. */ public DataByteArrayOutputStream(int size) { - if (size < 0) { + if (size <= 0) { throw new IllegalArgumentException("Invalid size: " + size); } buf = new byte[size]; } + + public DataByteArrayOutputStream(byte buf[]) { + if ( buf == null || buf.length==0 ) { + throw new IllegalArgumentException("Invalid buffer"); + } + this.buf = buf; + } /** * Creates a new byte array output stream.