activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r961078 - in /activemq/sandbox/activemq-apollo-actor: activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java activemq-util/src/main/java/org/apache/activemq/util/buffer/DataByteArrayOutputStream.java
Date Wed, 07 Jul 2010 03:45:26 GMT
Author: chirino
Date: Wed Jul  7 03:45:26 2010
New Revision: 961078

URL: http://svn.apache.org/viewvc?rev=961078&view=rev
Log:
optimizing tcp transport.. it's now on par /w the perf in the hawtdispatch stomp example

Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
    activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/util/buffer/DataByteArrayOutputStream.java

Modified: activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=961078&r1=961077&r2=961078&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
Wed Jul  7 03:45:26 2010
@@ -20,6 +20,7 @@ import org.apache.activemq.transport.Tra
 import org.apache.activemq.transport.TransportListener;
 import org.apache.activemq.util.buffer.Buffer;
 import org.apache.activemq.util.buffer.ByteArrayOutputStream;
+import org.apache.activemq.util.buffer.DataByteArrayOutputStream;
 import org.apache.activemq.wireformat.WireFormat;
 import org.fusesource.hawtdispatch.Dispatch;
 import org.fusesource.hawtdispatch.DispatchQueue;
@@ -75,23 +76,22 @@ public class TcpTransport implements Tra
     private DispatchSource readSource;
     private DispatchSource writeSource;
 
+    int bufferSize = 1024*64;
+
     final LinkedList<OneWay> outbound = new LinkedList<OneWay>();
-    int outboundSize = 0;
-    int maxOutbound = 1024 * 32;
-    ByteBuffer outbound_frame;
+    DataByteArrayOutputStream next_outbound_buffer;
+    ByteBuffer outbound_buffer;
     protected boolean useLocalHost = true;
-
-    int READ_BUFFFER_SIZE = 1024 * 32;
-    ByteBuffer readBuffer = ByteBuffer.allocate(1024 * 32);
+    ByteBuffer readBuffer = ByteBuffer.allocate(bufferSize);
 
 
     static final class OneWay {
-        final Buffer buffer;
+        final Object command;
         final Retained retained;
 
-        public OneWay(Buffer buffer, Retained retained) {
+        public OneWay(Object command, Retained retained) {
+            this.command = command;
             this.retained = retained;
-            this.buffer = buffer;
         }
     }
 
@@ -140,6 +140,11 @@ public class TcpTransport implements Tra
             channel = SocketChannel.open();
         }
         channel.configureBlocking(false);
+        channel.socket().setSendBufferSize(bufferSize);
+        channel.socket().setReceiveBufferSize(bufferSize);
+        next_outbound_buffer = new DataByteArrayOutputStream(bufferSize);
+        outbound_buffer = ByteBuffer.allocate(0);
+
         if (socketState == CONNECTING) {
 
             if (localLocation != null) {
@@ -186,8 +191,8 @@ public class TcpTransport implements Tra
     private void fireConnected() {
 
         try {
-            channel.socket().setSendBufferSize(maxOutbound);
-            channel.socket().setReceiveBufferSize(maxOutbound);
+            channel.socket().setSendBufferSize(bufferSize);
+            channel.socket().setReceiveBufferSize(bufferSize);
         } catch (SocketException e) {
         }
 
@@ -235,7 +240,7 @@ public class TcpTransport implements Tra
 
 
     public boolean isFull() {
-        return outboundSize >= maxOutbound;
+        return next_outbound_buffer.size() >= bufferSize>>2;
     }
 
     public void oneway(Object command, Retained retained) {
@@ -244,35 +249,29 @@ public class TcpTransport implements Tra
             if (socketState != CONNECTED) {
                 throw new IOException("Not connected.");
             }
+            if (transportState != RUNNING) {
+                throw new IOException("Not running.");
+            }
         } catch (IOException e) {
             listener.onTransportFailure(e);
         }
 
-        // Marshall the command.
-        Buffer buffer = null;
-        try {
-            buffer = wireformat.marshal(command);
-        } catch (IOException e) {
-            listener.onTransportFailure(e);
-            return;
-        }
-
-        OneWay oneway;
+        boolean wasEmpty = next_outbound_buffer.size()==0;
         if (retained!=null && isFull() ) {
             // retaining blocks the sender it is released.
             retained.retain();
-            oneway = new OneWay(buffer, retained);
+            outbound.add(new OneWay(command, retained));
         } else {
-            oneway = new OneWay(buffer, null);
+            try {
+                wireformat.marshal(command, next_outbound_buffer);
+            } catch (IOException e) {
+                listener.onTransportFailure(e);
+            }
+            if ( outbound_buffer.remaining()==0 ) {
+                writeSource.resume();
+            }
         }
-        outbound.add(oneway);
-        outboundSize += buffer.length;
 
-        // wait for write ready events if this write
-        // cannot be drained.
-        if (outbound.size() == 1 && !drainOutbound()) {
-            writeSource.resume();
-        }
     }
 
     /**
@@ -284,56 +283,49 @@ public class TcpTransport implements Tra
             while (socketState == CONNECTED) {
 
                 // if we have a pending write that is being sent over the socket...
-                if (outbound_frame != null) {
-
-                    channel.write(outbound_frame);
-                    if (outbound_frame.remaining() != 0) {
+                if (outbound_buffer.remaining()!=0) {
+                    channel.write(outbound_buffer);
+                    if (outbound_buffer.remaining() != 0) {
                         return false;
-                    } else {
-                        outbound_frame = null;
                     }
-
                 } else {
-
-                    // marshall all the available frames..
-                    OneWay oneWay = outbound.poll();
-
-                    int size = 0;
-                    ArrayList<Buffer> buffers = new ArrayList<Buffer>(outbound.size());
-                    while (oneWay != null) {
-                        size+=oneWay.buffer.length;
-                        buffers.add(oneWay.buffer);
-                        if (oneWay.retained != null) {
-                            oneWay.retained.release();
-                        }
-                        if (size < maxOutbound) {
-                            oneWay = outbound.poll();
-                        } else {
-                            oneWay = null;
+                    if( next_outbound_buffer.size()!=0) {
+                        // size of next buffer is based on how much was used in the previous
buffer.
+                        int prev_size = Math.min(Math.max(outbound_buffer.position()+512,
512), bufferSize);
+                        outbound_buffer = next_outbound_buffer.toBuffer().toByteBuffer();
+                        next_outbound_buffer = new DataByteArrayOutputStream(prev_size);
+                    } else {
+                        // marshall all the available frames..
+                        OneWay oneWay = outbound.poll();
+                        while (oneWay != null) {
+                            try {
+                                wireformat.marshal(oneWay.command, next_outbound_buffer);
+                            } catch (IOException e) {
+                                listener.onTransportFailure(e);
+                            }
+                            if (oneWay.retained != null) {
+                                oneWay.retained.release();
+                            }
+                            if ( isFull() ) {
+                                oneWay = null;
+                            } else {
+                                oneWay = outbound.poll();
+                            }
                         }
-                    }
 
-                    if (size == 0) {
-                        // the source is now drained...
-                        return true;
-                    } else {
-                        // Make the write just one big buffer.
-                        outboundSize -= size;
-                        ByteArrayOutputStream buffer = new ByteArrayOutputStream(size);
-                        for (Buffer b : buffers) {
-                            buffer.write(b);
+                        if (next_outbound_buffer.size() == 0) {
+                            // the source is now drained...
+                            return true;
                         }
-                        outbound_frame = buffer.toBuffer().toByteBuffer();
                     }
                 }
-
             }
 
         } catch (IOException e) {
             listener.onTransportFailure(e);
         }
 
-        return outbound.isEmpty() && outbound_frame == null;
+        return outbound.isEmpty() && outbound_buffer == null;
     }
 
 
@@ -350,7 +342,7 @@ public class TcpTransport implements Tra
                 if (readBuffer.remaining() == 0) {
 
                     // double the capacity size if needed...
-                    int new_capacity = unmarshalSession.getStartPos() != 0 ? READ_BUFFFER_SIZE
: readBuffer.capacity() << 2;
+                    int new_capacity = unmarshalSession.getStartPos() != 0 ? bufferSize :
readBuffer.capacity() << 2;
                     byte[] new_buffer = new byte[new_capacity];
 
                     // If there was un-consummed data.. move it to the start of the new buffer.

Modified: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/util/buffer/DataByteArrayOutputStream.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/util/buffer/DataByteArrayOutputStream.java?rev=961078&r1=961077&r2=961078&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/util/buffer/DataByteArrayOutputStream.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/util/buffer/DataByteArrayOutputStream.java
Wed Jul  7 03:45:26 2010
@@ -39,11 +39,18 @@ public class DataByteArrayOutputStream e
      * @exception IllegalArgumentException if size is negative.
      */
     public DataByteArrayOutputStream(int size) {
-        if (size < 0) {
+        if (size <= 0) {
             throw new IllegalArgumentException("Invalid size: " + size);
         }
         buf = new byte[size];
     }
+    
+    public DataByteArrayOutputStream(byte buf[]) {
+        if ( buf == null || buf.length==0 ) {
+            throw new IllegalArgumentException("Invalid buffer");
+        }
+        this.buf = buf;
+    }
 
     /**
      * Creates a new byte array output stream.



Mime
View raw message