activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1211900 - in /activemq/activemq-apollo/trunk: apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/ apollo-util/src/main/scala/org/apache/activemq/apollo/util/
Date Thu, 08 Dec 2011 14:20:31 GMT
Author: chirino
Date: Thu Dec  8 14:20:31 2011
New Revision: 1211900

URL: http://svn.apache.org/viewvc?rev=1211900&view=rev
Log:
Improve send performance of TCP transport by allowing writes to batch before they get flushed.

Modified:
    activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/IntCounter.scala
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/LongCounter.scala

Modified: activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java?rev=1211900&r1=1211899&r2=1211900&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java
Thu Dec  8 14:20:31 2011
@@ -174,6 +174,7 @@ public class TcpTransport extends JavaBa
     protected DispatchQueue dispatchQueue;
     private DispatchSource readSource;
     private DispatchSource writeSource;
+    private CustomDispatchSource<Integer, Integer> drainOutboundSource;
     private CustomDispatchSource<Integer, Integer> yieldSource;
 
     protected boolean useLocalHost = true;
@@ -468,6 +469,13 @@ public class TcpTransport extends JavaBa
             }
         });
         yieldSource.resume();
+        drainOutboundSource = Dispatch.createSource(EventAggregators.INTEGER_ADD, dispatchQueue);
+        drainOutboundSource.setEventHandler(new Runnable() {
+            public void run() {
+                drainOutbound();
+            }
+        });
+        drainOutboundSource.resume();
 
         readSource = Dispatch.createSource(channel, SelectionKey.OP_READ, dispatchQueue);
         writeSource = Dispatch.createSource(channel, SelectionKey.OP_WRITE, dispatchQueue);
@@ -528,6 +536,8 @@ public class TcpTransport extends JavaBa
         return codec==null || codec.full();
     }
 
+    boolean rejectingOffers;
+
     public boolean offer(Object command) {
         assert Dispatch.getCurrentQueue() == dispatchQueue;
         try {
@@ -539,14 +549,12 @@ public class TcpTransport extends JavaBa
             }
 
             ProtocolCodec.BufferState rc = codec.write(command);
+            rejectingOffers = codec.full();
             switch (rc ) {
                 case FULL:
                     return false;
                 default:
-                    if( drained ) {
-                        drained = false;
-                        resumeWrite();
-                    }
+                    drainOutboundSource.merge(1);
                     return true;
             }
         } catch (IOException e) {
@@ -556,8 +564,8 @@ public class TcpTransport extends JavaBa
 
     }
 
+    boolean writeResumedForCodecFlush = false;
 
-    boolean drained = true;
     /**
      *
      */
@@ -568,10 +576,17 @@ public class TcpTransport extends JavaBa
         }
         try {
             if( codec.flush() == ProtocolCodec.BufferState.EMPTY && flush() ) {
-                if( !drained ) {
-                    drained = true;
+                if( writeResumedForCodecFlush) {
+                    writeResumedForCodecFlush = false;
                     suspendWrite();
-                    listener.onRefill();
+                }
+                rejectingOffers = false;
+                listener.onRefill();
+
+            } else {
+                if(!writeResumedForCodecFlush) {
+                    writeResumedForCodecFlush = true;
+                    resumeWrite();
                 }
             }
         } catch (IOException e) {
@@ -658,6 +673,7 @@ public class TcpTransport extends JavaBa
             }
         }
     }
+
     private void _resumeRead() {
         readSource.resume();
         dispatchQueue.execute(new Runnable(){
@@ -672,14 +688,10 @@ public class TcpTransport extends JavaBa
             writeSource.suspend();
         }
     }
+
     protected void resumeWrite() {
         if( isConnected() && writeSource!=null ) {
             writeSource.resume();
-            dispatchQueue.execute(new Runnable(){
-                public void run() {
-                    drainOutbound();
-                }
-            });
         }
     }
 

Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/IntCounter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/IntCounter.scala?rev=1211900&r1=1211899&r2=1211900&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/IntCounter.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/IntCounter.scala
Thu Dec  8 14:20:31 2011
@@ -48,4 +48,5 @@ class IntCounter(private var value:Int =
     rc
   }
 
+  override def toString() = get().toString
 }
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/LongCounter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/LongCounter.scala?rev=1211900&r1=1211899&r2=1211900&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/LongCounter.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/LongCounter.scala
Thu Dec  8 14:20:31 2011
@@ -49,4 +49,5 @@ class LongCounter(private var value:Long
     rc
   }
 
+  override def toString() = get().toString
 }
\ No newline at end of file



Mime
View raw message