activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Hiram Chirino (JIRA)" <j...@apache.org>
Subject [jira] Updated: (AMQ-2139) Batch up multiple socket write calls in the TCP transport.
Date Mon, 23 Feb 2009 16:26:59 GMT

     [ https://issues.apache.org/activemq/browse/AMQ-2139?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Hiram Chirino updated AMQ-2139:
-------------------------------

    Description: 
Investigate using an async write thread for the TCP transport.  It would be able to more efficiently
batch up multiple writes into a single socket write. 

Bellow is a patch that should be investigated.  It should increase write performance of the
TCP transport:

{code}
$ svn diff
Index: activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
===================================================================
--- activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java	(revision
742546)
+++ activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java	(working
copy)
@@ -29,7 +29,9 @@
 import java.net.UnknownHostException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -119,6 +121,9 @@
     private Boolean tcpNoDelay;
     private Thread runnerThread;
 
+    private final ArrayBlockingQueue<Object> outbound = new ArrayBlockingQueue<Object>(100);
+    private Thread onewayThread;
+
     /**
      * Connect to a remote Node - e.g. a Broker
      * 
@@ -157,16 +162,39 @@
         this.localLocation = null;
         setDaemon(true);
     }
-
+    
     /**
      * A one way asynchronous send
      */
     public void oneway(Object command) throws IOException {
         checkStarted();
-        wireFormat.marshal(command, dataOut);
-        dataOut.flush();
+        try {
+            outbound.put(command);
+        } catch (InterruptedException e) {
+            throw new InterruptedIOException();
+        }
     }
 
+    protected void sendOneways() {
+        try {
+            while(!isStopped()) {
+                Object command = outbound.poll(500, TimeUnit.MILLISECONDS);
+                if( command!=null ) {
+                    try {
+                        while( command!=null ) {
+                            wireFormat.marshal(command, dataOut);
+                            command = outbound.poll();
+                        }
+                        dataOut.flush();
+                    } catch (IOException e) {
+                        getTransportListener().onException(e);
+                    }
+                }
+            }
+        } catch (InterruptedException e) {
+        }
+    }
+
     /**
      * @return pretty print of 'this'
      */
@@ -399,6 +427,11 @@
 
     protected void doStart() throws Exception {
         connect();
+        onewayThread = new Thread(null, new Runnable(){
+            public void run() {
+                sendOneways();
+            }}, "ActiveMQ Transport Sender: " + toString(), getStackSize());
+        onewayThread.start();
         stoppedLatch.set(new CountDownLatch(1));
         super.doStart();
     }
@@ -487,8 +520,12 @@
                     LOG.debug("Caught exception closing socket",e);
                 }
             }
-           
         }
+        if( onewayThread!=null ) {
+            onewayThread.join();
+            onewayThread = null;
+            outbound.clear();
+        }
     }
 
     /**
{code}

  was:
Investigate using an async write thread for the TCP transport.  It would be able to more efficiently
batch up multiple writes into a single socket write. 

{code}
$ svn diff
Index: activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
===================================================================
--- activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java	(revision
742546)
+++ activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java	(working
copy)
@@ -29,7 +29,9 @@
 import java.net.UnknownHostException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -119,6 +121,9 @@
     private Boolean tcpNoDelay;
     private Thread runnerThread;
 
+    private final ArrayBlockingQueue<Object> outbound = new ArrayBlockingQueue<Object>(100);
+    private Thread onewayThread;
+
     /**
      * Connect to a remote Node - e.g. a Broker
      * 
@@ -157,16 +162,39 @@
         this.localLocation = null;
         setDaemon(true);
     }
-
+    
     /**
      * A one way asynchronous send
      */
     public void oneway(Object command) throws IOException {
         checkStarted();
-        wireFormat.marshal(command, dataOut);
-        dataOut.flush();
+        try {
+            outbound.put(command);
+        } catch (InterruptedException e) {
+            throw new InterruptedIOException();
+        }
     }
 
+    protected void sendOneways() {
+        try {
+            while(!isStopped()) {
+                Object command = outbound.poll(500, TimeUnit.MILLISECONDS);
+                if( command!=null ) {
+                    try {
+                        while( command!=null ) {
+                            wireFormat.marshal(command, dataOut);
+                            command = outbound.poll();
+                        }
+                        dataOut.flush();
+                    } catch (IOException e) {
+                        getTransportListener().onException(e);
+                    }
+                }
+            }
+        } catch (InterruptedException e) {
+        }
+    }
+
     /**
      * @return pretty print of 'this'
      */
@@ -399,6 +427,11 @@
 
     protected void doStart() throws Exception {
         connect();
+        onewayThread = new Thread(null, new Runnable(){
+            public void run() {
+                sendOneways();
+            }}, "ActiveMQ Transport Sender: " + toString(), getStackSize());
+        onewayThread.start();
         stoppedLatch.set(new CountDownLatch(1));
         super.doStart();
     }
@@ -487,8 +520,12 @@
                     LOG.debug("Caught exception closing socket",e);
                 }
             }
-           
         }
+        if( onewayThread!=null ) {
+            onewayThread.join();
+            onewayThread = null;
+            outbound.clear();
+        }
     }
 
     /**
{code}


> Batch up multiple socket write calls in the TCP transport.
> ----------------------------------------------------------
>
>                 Key: AMQ-2139
>                 URL: https://issues.apache.org/activemq/browse/AMQ-2139
>             Project: ActiveMQ
>          Issue Type: Improvement
>          Components: Transport
>    Affects Versions: 5.2.0
>            Reporter: Hiram Chirino
>            Assignee: Hiram Chirino
>             Fix For: 6.0.0
>
>
> Investigate using an async write thread for the TCP transport.  It would be able to more
efficiently batch up multiple writes into a single socket write. 
> Bellow is a patch that should be investigated.  It should increase write performance
of the TCP transport:
> {code}
> $ svn diff
> Index: activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
> ===================================================================
> --- activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java	(revision
742546)
> +++ activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java	(working
copy)
> @@ -29,7 +29,9 @@
>  import java.net.UnknownHostException;
>  import java.util.HashMap;
>  import java.util.Map;
> +import java.util.concurrent.ArrayBlockingQueue;
>  import java.util.concurrent.CountDownLatch;
> +import java.util.concurrent.LinkedBlockingQueue;
>  import java.util.concurrent.SynchronousQueue;
>  import java.util.concurrent.ThreadFactory;
>  import java.util.concurrent.ThreadPoolExecutor;
> @@ -119,6 +121,9 @@
>      private Boolean tcpNoDelay;
>      private Thread runnerThread;
>  
> +    private final ArrayBlockingQueue<Object> outbound = new ArrayBlockingQueue<Object>(100);
> +    private Thread onewayThread;
> +
>      /**
>       * Connect to a remote Node - e.g. a Broker
>       * 
> @@ -157,16 +162,39 @@
>          this.localLocation = null;
>          setDaemon(true);
>      }
> -
> +    
>      /**
>       * A one way asynchronous send
>       */
>      public void oneway(Object command) throws IOException {
>          checkStarted();
> -        wireFormat.marshal(command, dataOut);
> -        dataOut.flush();
> +        try {
> +            outbound.put(command);
> +        } catch (InterruptedException e) {
> +            throw new InterruptedIOException();
> +        }
>      }
>  
> +    protected void sendOneways() {
> +        try {
> +            while(!isStopped()) {
> +                Object command = outbound.poll(500, TimeUnit.MILLISECONDS);
> +                if( command!=null ) {
> +                    try {
> +                        while( command!=null ) {
> +                            wireFormat.marshal(command, dataOut);
> +                            command = outbound.poll();
> +                        }
> +                        dataOut.flush();
> +                    } catch (IOException e) {
> +                        getTransportListener().onException(e);
> +                    }
> +                }
> +            }
> +        } catch (InterruptedException e) {
> +        }
> +    }
> +
>      /**
>       * @return pretty print of 'this'
>       */
> @@ -399,6 +427,11 @@
>  
>      protected void doStart() throws Exception {
>          connect();
> +        onewayThread = new Thread(null, new Runnable(){
> +            public void run() {
> +                sendOneways();
> +            }}, "ActiveMQ Transport Sender: " + toString(), getStackSize());
> +        onewayThread.start();
>          stoppedLatch.set(new CountDownLatch(1));
>          super.doStart();
>      }
> @@ -487,8 +520,12 @@
>                      LOG.debug("Caught exception closing socket",e);
>                  }
>              }
> -           
>          }
> +        if( onewayThread!=null ) {
> +            onewayThread.join();
> +            onewayThread = null;
> +            outbound.clear();
> +        }
>      }
>  
>      /**
> {code}

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


Mime
View raw message