activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1058996 - in /activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp: SslTransport.java TcpTransport.java
Date Fri, 14 Jan 2011 13:09:47 GMT
Author: chirino
Date: Fri Jan 14 13:09:47 2011
New Revision: 1058996

URL: http://svn.apache.org/viewvc?rev=1058996&view=rev
Log:
Adding option to limit the read/write rate of the tcp transports.

Modified:
    activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/SslTransport.java
    activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java

Modified: activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/SslTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/SslTransport.java?rev=1058996&r1=1058995&r2=1058996&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/SslTransport.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/SslTransport.java
Fri Jan 14 13:09:47 2011
@@ -39,6 +39,7 @@ public class SslTransport extends TcpTra
     private boolean writeFlushing;
 
     private ByteBuffer readOverflowBuffer;
+    private SSLChannel ssl_channel = new SSLChannel();
 
     public void setSSLContext(SSLContext ctx) {
         this.sslContext = ctx;
@@ -55,11 +56,11 @@ public class SslTransport extends TcpTra
         }
 
         public boolean isOpen() {
-            return channel.isOpen();
+            return getSocketChannel().isOpen();
         }
 
         public void close() throws IOException {
-            channel.close();
+            getSocketChannel().close();
         }
     }
 
@@ -85,14 +86,6 @@ public class SslTransport extends TcpTra
     }
 
     @Override
-    protected void initializeCodec() {
-        SSLChannel channel = new SSLChannel();
-        codec.setReadableByteChannel(channel);
-        codec.setWritableByteChannel(channel);
-    }
-
-
-    @Override
     public void connecting(URI remoteLocation, URI localLocation) throws Exception {
         assert engine == null;
         engine = sslContext.createSSLEngine();
@@ -113,8 +106,6 @@ public class SslTransport extends TcpTra
         writeBuffer = ByteBuffer.allocateDirect(session.getPacketBufferSize());
 
         super.connected(channel);
-
-
     }
 
     @Override
@@ -146,7 +137,7 @@ public class SslTransport extends TcpTra
     protected boolean flush() throws IOException {
         while (true) {
             if(writeFlushing) {
-                channel.write(writeBuffer);
+                super.writeChannel().write(writeBuffer);
                 if( !writeBuffer.hasRemaining() ) {
                     writeBuffer.clear();
                     writeFlushing = false;
@@ -200,7 +191,7 @@ public class SslTransport extends TcpTra
                     return rc;
                 }
             } else if( readUnderflow ) {
-                int count = channel.read(readBuffer);
+                int count = super.readChannel().read(readBuffer);
                 if( count == -1 ) {  // peer closed socket.
                     if (rc==0) {
                         engine.closeInbound();
@@ -308,6 +299,15 @@ public class SslTransport extends TcpTra
         }
     }
 
+
+    public ReadableByteChannel readChannel() {
+        return ssl_channel;
+    }
+
+    public WritableByteChannel writeChannel() {
+        return ssl_channel;
+    }
+
 }
 
 

Modified: activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java?rev=1058996&r1=1058995&r2=1058996&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java
Fri Jan 14 13:09:47 2011
@@ -27,13 +27,16 @@ import org.fusesource.hawtdispatch.Dispa
 import org.fusesource.hawtdispatch.DispatchSource;
 import org.fusesource.hawtdispatch.Retained;
 
-import javax.net.ssl.SSLException;
 import java.io.IOException;
 import java.net.*;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
+import java.nio.channels.WritableByteChannel;
 import java.util.LinkedList;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 /**
  * An implementation of the {@link org.apache.activemq.apollo.transport.Transport} interface
using raw tcp/ip
@@ -176,6 +179,78 @@ public class TcpTransport extends JavaBa
     protected boolean useLocalHost = true;
     protected boolean full = false;
 
+    int max_read_rate;
+    int max_write_rate;
+    protected RateLimitingChannel rateLimitingChannel;
+
+    class RateLimitingChannel implements ReadableByteChannel, WritableByteChannel {
+
+        int read_allowance = max_read_rate;
+        int write_allowance = max_write_rate;
+
+        public void resetAllowance() {
+            read_allowance = max_read_rate;
+            write_allowance = max_write_rate;
+        }
+
+        public int read(ByteBuffer dst) throws IOException {
+            if( max_read_rate==0 ) {
+                return channel.read(dst);
+            } else {
+                int remaining = dst.remaining();
+                if( read_allowance ==0 || remaining ==0 ) {
+                    return 0;
+                }
+
+                int reduction = 0;
+                if( remaining > read_allowance) {
+                    reduction = remaining - read_allowance;
+                    dst.limit(dst.limit() - reduction);
+                }
+                try {
+                    return channel.read(dst);
+                } finally {
+                    if( reduction!=0 ) {
+                        dst.limit(dst.limit() + reduction);
+                    }
+                }
+            }
+        }
+
+        public int write(ByteBuffer src) throws IOException {
+            if( max_write_rate==0 ) {
+                return channel.write(src);
+            } else {
+                int remaining = src.remaining();
+                if( write_allowance ==0 || remaining ==0 ) {
+                    return 0;
+                }
+
+                int reduction = 0;
+                if( remaining > write_allowance) {
+                    reduction = remaining - write_allowance;
+                    src.limit(src.limit() - reduction);
+                }
+                try {
+                    return channel.write(src);
+                } finally {
+                    if( reduction!=0 ) {
+                        src.limit(src.limit() + reduction);
+                    }
+                }
+            }
+        }
+
+        public boolean isOpen() {
+            return channel.isOpen();
+        }
+
+        public void close() throws IOException {
+            channel.close();
+        }
+
+    }
+
     private final Runnable CANCEL_HANDLER = new Runnable() {
         public void run() {
             socketState.onCanceled();
@@ -208,8 +283,8 @@ public class TcpTransport extends JavaBa
     }
 
     protected void initializeCodec() {
-        codec.setReadableByteChannel(this.channel);
-        codec.setWritableByteChannel(this.channel);
+        codec.setReadableByteChannel(readChannel());
+        codec.setWritableByteChannel(writeChannel());
     }
 
     public void connecting(URI remoteLocation, URI localLocation) throws IOException, Exception
{
@@ -325,11 +400,26 @@ public class TcpTransport extends JavaBa
             }
         });
 
+        if( max_read_rate!=0 || max_write_rate!=0 ) {
+            rateLimitingChannel = new RateLimitingChannel();
+            schedualRateAllowanceReset();
+        }
+
         remoteAddress = channel.socket().getRemoteSocketAddress().toString();
         listener.onTransportConnected();
     }
 
-
+    private void schedualRateAllowanceReset() {
+        dispatchQueue.dispatchAfter(1, TimeUnit.SECONDS, new Runnable(){
+            public void run() {
+                if( !socketState.is(CONNECTED.class) ) {
+                    return;
+                }
+                rateLimitingChannel.resetAllowance();
+                schedualRateAllowanceReset();
+            }
+        });
+    }
 
     private void dispose() {
 
@@ -550,4 +640,36 @@ public class TcpTransport extends JavaBa
         return channel;
     }
 
+    public ReadableByteChannel readChannel() {
+        if(rateLimitingChannel!=null) {
+            return rateLimitingChannel;
+        } else {
+            return channel;
+        }
+    }
+
+    public WritableByteChannel writeChannel() {
+        if(rateLimitingChannel!=null) {
+            return rateLimitingChannel;
+        } else {
+            return channel;
+        }
+    }
+
+    public int getMax_read_rate() {
+        return max_read_rate;
+    }
+
+    public void setMax_read_rate(int max_read_rate) {
+        this.max_read_rate = max_read_rate;
+    }
+
+    public int getMax_write_rate() {
+        return max_write_rate;
+    }
+
+    public void setMax_write_rate(int max_write_rate) {
+        this.max_write_rate = max_write_rate;
+    }
+
 }



Mime
View raw message