activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject git commit: https://issues.apache.org/jira/browse/AMQ-5269
Date Thu, 10 Jul 2014 20:32:56 GMT
Repository: activemq
Updated Branches:
  refs/heads/trunk a498bffdd -> e957937f0


https://issues.apache.org/jira/browse/AMQ-5269

Use a selector based accept when the transport socket has a selectable
channel which is the case for all the NIO transport.  Can reduce the
time it takes to close down the transport speeding up tests.  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/e957937f
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/e957937f
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/e957937f

Branch: refs/heads/trunk
Commit: e957937f08628a2a9284d872b2ebc35ef12c6f72
Parents: a498bff
Author: Timothy Bish <tabish121@gmail.com>
Authored: Thu Jul 10 16:32:44 2014 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Thu Jul 10 16:32:44 2014 -0400

----------------------------------------------------------------------
 .../activemq/transport/nio/SelectorManager.java |  34 +++---
 .../transport/nio/SelectorSelection.java        |  24 +++--
 .../transport/tcp/TcpTransportServer.java       | 107 ++++++++++++++-----
 3 files changed, 111 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/e957937f/activemq-client/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
b/activemq-client/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
index 9be5231..7442b15 100644
--- a/activemq-client/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
@@ -17,7 +17,7 @@
 package org.apache.activemq.transport.nio;
 
 import java.io.IOException;
-import java.nio.channels.SocketChannel;
+import java.nio.channels.spi.AbstractSelectableChannel;
 import java.util.LinkedList;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
@@ -43,17 +43,18 @@ public final class SelectorManager {
     private int maxChannelsPerWorker = 1024;
 
     protected ExecutorService createDefaultExecutor() {
-        ThreadPoolExecutor rc = new ThreadPoolExecutor(0, Integer.MAX_VALUE, getDefaultKeepAliveTime(),
TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
+        ThreadPoolExecutor rc = new ThreadPoolExecutor(0, Integer.MAX_VALUE, getDefaultKeepAliveTime(),
TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
+            new ThreadFactory() {
 
-            private long i = 0;
+                private long i = 0;
 
-            @Override
-            public Thread newThread(Runnable runnable) {
-                this.i++;
-                final Thread t = new Thread(runnable, "ActiveMQ NIO Worker " + this.i);
-                return t;
-            }
-        });
+                @Override
+                public Thread newThread(Runnable runnable) {
+                    this.i++;
+                    final Thread t = new Thread(runnable, "ActiveMQ NIO Worker " + this.i);
+                    return t;
+                }
+            });
 
         return rc;
     }
@@ -68,27 +69,26 @@ public final class SelectorManager {
 
     public interface Listener {
         void onSelect(SelectorSelection selector);
+
         void onError(SelectorSelection selection, Throwable error);
     }
 
-    public synchronized SelectorSelection register(SocketChannel socketChannel, Listener
listener)
-        throws IOException {
-
+    public synchronized SelectorSelection register(AbstractSelectableChannel selectableChannel,
Listener listener) throws IOException {
         SelectorSelection selection = null;
-        while( selection == null ) {
+        while (selection == null) {
             if (freeWorkers.size() > 0) {
                 SelectorWorker worker = freeWorkers.getFirst();
-                if( worker.isReleased() ) {
+                if (worker.isReleased()) {
                     freeWorkers.remove(worker);
                 } else {
                     worker.retain();
-                    selection = new SelectorSelection(worker, socketChannel, listener);
+                    selection = new SelectorSelection(worker, selectableChannel, listener);
                 }
             } else {
                 // Worker starts /w retain count of 1
                 SelectorWorker worker = new SelectorWorker(this);
                 freeWorkers.addFirst(worker);
-                selection = new SelectorSelection(worker, socketChannel, listener);
+                selection = new SelectorSelection(worker, selectableChannel, listener);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/e957937f/activemq-client/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java
b/activemq-client/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java
index e96d50d..01480a0 100644
--- a/activemq-client/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java
@@ -19,13 +19,13 @@ package org.apache.activemq.transport.nio;
 import java.nio.channels.CancelledKeyException;
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
+import java.nio.channels.spi.AbstractSelectableChannel;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.transport.nio.SelectorManager.Listener;
 
 /**
- * @author chirino
+ *
  */
 public final class SelectorSelection {
 
@@ -33,15 +33,16 @@ public final class SelectorSelection {
     private final Listener listener;
     private int interest;
     private SelectionKey key;
-    private AtomicBoolean closed = new AtomicBoolean();
+    private final AtomicBoolean closed = new AtomicBoolean();
 
-    public SelectorSelection(final SelectorWorker worker, final SocketChannel socketChannel,
Listener listener) throws ClosedChannelException {
+    public SelectorSelection(final SelectorWorker worker, final AbstractSelectableChannel
selectable, Listener listener) throws ClosedChannelException {
         this.worker = worker;
         this.listener = listener;
         worker.addIoTask(new Runnable() {
+            @Override
             public void run() {
                 try {
-                    SelectorSelection.this.key = socketChannel.register(worker.selector,
0, SelectorSelection.this);
+                    SelectorSelection.this.key = selectable.register(worker.selector, 0,
SelectorSelection.this);
                 } catch (Exception e) {
                     e.printStackTrace();
                 }
@@ -55,30 +56,32 @@ public final class SelectorSelection {
 
     public void enable() {
         worker.addIoTask(new Runnable() {
+            @Override
             public void run() {
                 try {
                     key.interestOps(interest);
                 } catch (CancelledKeyException e) {
                 }
             }
-        });        
+        });
     }
 
     public void disable() {
         worker.addIoTask(new Runnable() {
+            @Override
             public void run() {
                 try {
                     key.interestOps(0);
                 } catch (CancelledKeyException e) {
                 }
             }
-        });        
+        });
     }
 
     public void close() {
-        // guard against multiple closes.
-        if( closed.compareAndSet(false, true) ) {
+        if (closed.compareAndSet(false, true)) {
             worker.addIoTask(new Runnable() {
+                @Override
                 public void run() {
                     try {
                         key.cancel();
@@ -86,7 +89,7 @@ public final class SelectorSelection {
                     }
                     worker.release();
                 }
-            });        
+            });
         }
     }
 
@@ -97,5 +100,4 @@ public final class SelectorSelection {
     public void onError(Throwable e) {
         listener.onError(this, e);
     }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/e957937f/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
index 2c97e25..b44a462 100755
--- a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
@@ -26,6 +26,9 @@ import java.net.SocketTimeoutException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.UnknownHostException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
 import java.util.HashMap;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -43,6 +46,8 @@ import org.apache.activemq.openwire.OpenWireFormatFactory;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportServer;
 import org.apache.activemq.transport.TransportServerThreadSupport;
+import org.apache.activemq.transport.nio.SelectorManager;
+import org.apache.activemq.transport.nio.SelectorSelection;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.InetAddressUtil;
 import org.apache.activemq.util.IntrospectionSupport;
@@ -56,15 +61,12 @@ import org.slf4j.LoggerFactory;
 
 /**
  * A TCP based implementation of {@link TransportServer}
- *
- * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement
modifications)
- *
  */
-
 public class TcpTransportServer extends TransportServerThreadSupport implements ServiceListener
{
 
     private static final Logger LOG = LoggerFactory.getLogger(TcpTransportServer.class);
     protected ServerSocket serverSocket;
+    protected SelectorSelection selector;
     protected int backlog = 5000;
     protected WireFormatFactory wireFormatFactory = new OpenWireFormatFactory();
     protected final TcpTransportFactory transportFactory;
@@ -74,7 +76,6 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
     protected boolean useQueueForAccept = true;
     protected boolean allowLinkStealing;
 
-
     /**
      * trace=true -> the Transport stack where this TcpTransport object will be, will
have a TransportLogger layer
      * trace=false -> the Transport stack where this TcpTransport object will be, will
NOT have a TransportLogger layer,
@@ -93,11 +94,13 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
      * set in Connection or TransportConnector URIs.
      */
     protected String logWriterName = TransportLoggerSupport.defaultLogWriterName;
+
     /**
      * Specifies if the TransportLogger will be manageable by JMX or not. Also, as long as
there is at least 1
      * TransportLogger which is manageable, a TransportLoggerControl MBean will me created.
      */
     protected boolean dynamicManagement = false;
+
     /**
      * startLogging=true -> the TransportLogger object of the Transport stack will initially
write messages to the log.
      * startLogging=false -> the TransportLogger object of the Transport stack will initially
NOT write messages to the
@@ -108,6 +111,7 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
     protected final ServerSocketFactory serverSocketFactory;
     protected BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>();
     protected Thread socketHandlerThread;
+
     /**
      * The maximum number of sockets allowed for this server
      */
@@ -140,8 +144,7 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
         } catch (URISyntaxException e) {
 
             // it could be that the host name contains invalid characters such
-            // as _ on unix platforms
-            // so lets try use the IP address instead
+            // as _ on unix platforms so lets try use the IP address instead
             try {
                 setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), addr.getHostAddress(),
serverSocket.getLocalPort(), bind.getPath(),
                     bind.getQuery(), bind.getFragment()));
@@ -295,29 +298,76 @@ public class TcpTransportServer extends TransportServerThreadSupport
implements
      */
     @Override
     public void run() {
-        while (!isStopped()) {
-            Socket socket = null;
+        final ServerSocketChannel chan = serverSocket.getChannel();
+        if (chan != null) {
             try {
-                socket = serverSocket.accept();
-                if (socket != null) {
-                    if (isStopped() || getAcceptListener() == null) {
-                        socket.close();
-                    } else {
-                        if (useQueueForAccept) {
-                            socketQueue.put(socket);
+                chan.configureBlocking(false);
+                selector = SelectorManager.getInstance().register(chan, new SelectorManager.Listener()
{
+                    @Override
+                    public void onSelect(SelectorSelection sel) {
+                        try {
+                            SocketChannel sc = chan.accept();
+                            if (sc != null) {
+                                if (isStopped() || getAcceptListener() == null) {
+                                    sc.close();
+                                } else {
+                                    if (useQueueForAccept) {
+                                        socketQueue.put(sc.socket());
+                                    } else {
+                                        handleSocket(sc.socket());
+                                    }
+                                }
+                            }
+                        } catch (Exception e) {
+                            onError(sel, e);
+                        }
+                    }
+                    @Override
+                    public void onError(SelectorSelection sel, Throwable error) {
+                        Exception e = null;
+                        if (error instanceof Exception) {
+                            e = (Exception)error;
+                        } else {
+                            e = new Exception(error);
+                        }
+                        if (!isStopping()) {
+                            onAcceptError(e);
+                        } else if (!isStopped()) {
+                            LOG.warn("run()", e);
+                            onAcceptError(e);
+                        }
+                    }
+                });
+                selector.setInterestOps(SelectionKey.OP_ACCEPT);
+                selector.enable();
+            } catch (IOException ex) {
+                selector = null;
+            }
+        } else {
+            while (!isStopped()) {
+                Socket socket = null;
+                try {
+                    socket = serverSocket.accept();
+                    if (socket != null) {
+                        if (isStopped() || getAcceptListener() == null) {
+                            socket.close();
                         } else {
-                            handleSocket(socket);
+                            if (useQueueForAccept) {
+                                socketQueue.put(socket);
+                            } else {
+                                handleSocket(socket);
+                            }
                         }
                     }
-                }
-            } catch (SocketTimeoutException ste) {
-                // expect this to happen
-            } catch (Exception e) {
-                if (!isStopping()) {
-                    onAcceptError(e);
-                } else if (!isStopped()) {
-                    LOG.warn("run()", e);
-                    onAcceptError(e);
+                } catch (SocketTimeoutException ste) {
+                    // expect this to happen
+                } catch (Exception e) {
+                    if (!isStopping()) {
+                        onAcceptError(e);
+                    } else if (!isStopped()) {
+                        LOG.warn("run()", e);
+                        onAcceptError(e);
+                    }
                 }
             }
         }
@@ -405,6 +455,11 @@ public class TcpTransportServer extends TransportServerThreadSupport
implements
 
     @Override
     protected void doStop(ServiceStopper stopper) throws Exception {
+        if (selector != null) {
+            selector.disable();
+            selector.close();
+            selector = null;
+        }
         if (serverSocket != null) {
             serverSocket.close();
             serverSocket = null;


Mime
View raw message