activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1147638 - /activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java
Date Sun, 17 Jul 2011 14:43:09 GMT
Author: chirino
Date: Sun Jul 17 14:43:08 2011
New Revision: 1147638

URL: http://svn.apache.org/viewvc?rev=1147638&view=rev
Log:
Fixes bug where when we attempted to yield to other connections for processing we did not
get called back to resume draining.

Modified:
    activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java

Modified: activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java?rev=1147638&r1=1147637&r2=1147638&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java
Sun Jul 17 14:43:08 2011
@@ -17,15 +17,12 @@
 package org.apache.activemq.apollo.transport.tcp;
 
 import org.apache.activemq.apollo.transport.ProtocolCodec;
+import org.apache.activemq.apollo.transport.Transport;
 import org.apache.activemq.apollo.transport.TransportListener;
 import org.apache.activemq.apollo.util.JavaBaseService;
-import org.apache.activemq.apollo.transport.Transport;
+import org.fusesource.hawtdispatch.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.fusesource.hawtdispatch.Dispatch;
-import org.fusesource.hawtdispatch.DispatchQueue;
-import org.fusesource.hawtdispatch.DispatchSource;
-import org.fusesource.hawtdispatch.Retained;
 
 import java.io.IOException;
 import java.net.*;
@@ -35,7 +32,6 @@ import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
 import java.nio.channels.WritableByteChannel;
 import java.util.LinkedList;
-import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -178,6 +174,7 @@ public class TcpTransport extends JavaBa
     protected DispatchQueue dispatchQueue;
     private DispatchSource readSource;
     private DispatchSource writeSource;
+    private CustomDispatchSource<Integer, Integer> yieldSource;
 
     protected boolean useLocalHost = true;
 
@@ -459,6 +456,14 @@ public class TcpTransport extends JavaBa
 
     protected void onConnected() throws IOException {
 
+        yieldSource = Dispatch.createSource(EventAggregators.INTEGER_ADD, dispatchQueue);
+        yieldSource.setEventHandler(new Runnable() {
+            public void run() {
+                drainInbound();
+            }
+        });
+        yieldSource.resume();
+
         readSource = Dispatch.createSource(channel, SelectionKey.OP_READ, dispatchQueue);
         writeSource = Dispatch.createSource(channel, SelectionKey.OP_WRITE, dispatchQueue);
 
@@ -579,9 +584,9 @@ public class TcpTransport extends JavaBa
         }
         try {
             long initial = codec.getReadCounter();
-            // Only process upto 64k worth of data at a time so we can give
+            // Only process upto 2 x the read buffer worth of data at a time so we can give
             // other connections a chance to process their requests.
-            while( codec.getReadCounter()-initial < 1024*64 ) {
+            while( codec.getReadCounter()-initial < codec.getReadBufferSize()<<2
) {
                 Object command = codec.read();
                 if ( command!=null ) {
                     try {
@@ -599,6 +604,7 @@ public class TcpTransport extends JavaBa
                     return;
                 }
             }
+            yieldSource.merge(1);
         } catch (IOException e) {
             onTransportFailure(e);
         }



Mime
View raw message