activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cshan...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-6505
Date Sat, 12 Nov 2016 15:03:15 GMT
Repository: activemq
Updated Branches:
  refs/heads/activemq-5.14.x 41ce86bd9 -> cc633e691


https://issues.apache.org/jira/browse/AMQ-6505

Fixing the auto transport protocol detection so that the byte buffer
that captures the initial bytes for detection is not shared across
threads. This was causing failed connections under high load and high cpu
usage under NIO

(cherry picked from commit 7e648d512d06508d85f6a4e111d9adbdb9e33a82)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/cc633e69
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/cc633e69
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/cc633e69

Branch: refs/heads/activemq-5.14.x
Commit: cc633e6913f36b63da6d3b2a2bda4f9cff8c6a66
Parents: 41ce86b
Author: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Authored: Sat Nov 12 10:00:32 2016 -0500
Committer: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Committed: Sat Nov 12 10:03:07 2016 -0500

----------------------------------------------------------------------
 .../transport/auto/AutoSslTransportFactory.java |  11 +-
 .../transport/auto/AutoSslTransportServer.java  |   5 +-
 .../transport/auto/AutoTcpTransportFactory.java |  12 +-
 .../transport/auto/AutoTcpTransportServer.java  |  19 +-
 .../auto/nio/AutoNIOSSLTransportServer.java     |   2 +-
 .../auto/nio/AutoNioTransportFactory.java       |   7 +-
 .../auto/AutoTransportConnectionsTest.java      | 208 +++++++++++++++++++
 .../auto/AutoTransportMaxConnectionsTest.java   | 151 --------------
 8 files changed, 226 insertions(+), 189 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/cc633e69/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoSslTransportFactory.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoSslTransportFactory.java
b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoSslTransportFactory.java
index 19704e9..d1ad524 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoSslTransportFactory.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoSslTransportFactory.java
@@ -32,6 +32,7 @@ import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.transport.TransportServer;
 import org.apache.activemq.transport.tcp.SslTransportFactory;
 import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer;
 import org.apache.activemq.transport.tcp.TcpTransportFactory;
 import org.apache.activemq.transport.tcp.TcpTransportServer;
 import org.apache.activemq.util.IOExceptionSupport;
@@ -96,18 +97,12 @@ public class AutoSslTransportFactory extends SslTransportFactory implements
Brok
     protected AutoSslTransportServer createAutoSslTransportServer(final URI location, SSLServerSocketFactory
serverSocketFactory) throws IOException, URISyntaxException {
         AutoSslTransportServer server = new AutoSslTransportServer(this, location, serverSocketFactory,
                 this.brokerService, enabledProtocols) {
-            @Override
-            protected TcpTransport createTransport(Socket socket, WireFormat format)
-                    throws IOException {
-                setDefaultLinkStealing(format, this);
-                return super.createTransport(socket, format);
-            }
 
             @Override
             protected TcpTransport createTransport(Socket socket, WireFormat format,
-                    TcpTransportFactory detectedTransportFactory) throws IOException {
+                    TcpTransportFactory detectedTransportFactory, InitBuffer initBuffer)
throws IOException {
                 setDefaultLinkStealing(format, this);
-                return super.createTransport(socket, format, detectedTransportFactory);
+                return super.createTransport(socket, format, detectedTransportFactory, initBuffer);
             }
         };
         return server;

http://git-wip-us.apache.org/repos/asf/activemq/blob/cc633e69/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoSslTransportServer.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoSslTransportServer.java
b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoSslTransportServer.java
index acd9998..74ac7ed 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoSslTransportServer.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoSslTransportServer.java
@@ -29,6 +29,7 @@ import javax.net.ssl.SSLServerSocketFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.transport.tcp.SslTransportFactory;
 import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer;
 import org.apache.activemq.transport.tcp.TcpTransportFactory;
 import org.apache.activemq.wireformat.WireFormat;
 
@@ -120,9 +121,9 @@ public class AutoSslTransportServer extends AutoTcpTransportServer {
      */
     @Override
     protected TcpTransport createTransport(Socket socket, WireFormat format,
-            TcpTransportFactory detectedTransportFactory) throws IOException {
+            TcpTransportFactory detectedTransportFactory, InitBuffer initBuffer) throws IOException
{
 
-        return detectedTransportFactory.createTransport(format, socket, this.initBuffer);
+        return detectedTransportFactory.createTransport(format, socket, initBuffer);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/cc633e69/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoTcpTransportFactory.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoTcpTransportFactory.java
b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoTcpTransportFactory.java
index 8316422..10ddca0 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoTcpTransportFactory.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoTcpTransportFactory.java
@@ -31,6 +31,7 @@ import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.openwire.OpenWireFormatFactory;
 import org.apache.activemq.transport.TransportServer;
 import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer;
 import org.apache.activemq.transport.tcp.TcpTransportFactory;
 import org.apache.activemq.transport.tcp.TcpTransportServer;
 import org.apache.activemq.util.IOExceptionSupport;
@@ -88,17 +89,10 @@ public class AutoTcpTransportFactory extends TcpTransportFactory implements
Brok
         AutoTcpTransportServer server = new AutoTcpTransportServer(this, location, serverSocketFactory,
brokerService, enabledProtocols) {
 
             @Override
-            protected TcpTransport createTransport(Socket socket, WireFormat format)
-                    throws IOException {
-                setDefaultLinkStealing(format, this);
-                return super.createTransport(socket, format);
-            }
-
-            @Override
             protected TcpTransport createTransport(Socket socket, WireFormat format,
-                    TcpTransportFactory detectedTransportFactory) throws IOException {
+                    TcpTransportFactory detectedTransportFactory, InitBuffer initBuffer)
throws IOException {
                 setDefaultLinkStealing(format, this);
-                return super.createTransport(socket, format, detectedTransportFactory);
+                return super.createTransport(socket, format, detectedTransportFactory, initBuffer);
             }
 
         };

http://git-wip-us.apache.org/repos/asf/activemq/blob/cc633e69/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoTcpTransportServer.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoTcpTransportServer.java
b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoTcpTransportServer.java
index 64162dd..8eeb6ac 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoTcpTransportServer.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoTcpTransportServer.java
@@ -222,12 +222,6 @@ public class AutoTcpTransportServer extends TcpTransportServer {
 
     protected final ThreadPoolExecutor service;
 
-
-    /**
-     * This holds the initial buffer that has been read to detect the protocol.
-     */
-    public InitBuffer initBuffer;
-
     @Override
     protected void handleSocket(final Socket socket) {
         final AutoTcpTransportServer server = this;
@@ -272,7 +266,7 @@ public class AutoTcpTransportServer extends TcpTransportServer {
         data.flip();
         ProtocolInfo protocolInfo = detectProtocol(data.array());
 
-        initBuffer = new InitBuffer(readBytes.get(), ByteBuffer.allocate(readBytes.get()));
+        InitBuffer initBuffer = new InitBuffer(readBytes.get(), ByteBuffer.allocate(readBytes.get()));
         initBuffer.buffer.put(data.array());
 
         if (protocolInfo.detectedTransportFactory instanceof BrokerServiceAware) {
@@ -280,7 +274,7 @@ public class AutoTcpTransportServer extends TcpTransportServer {
         }
 
         WireFormat format = protocolInfo.detectedWireFormatFactory.createWireFormat();
-        Transport transport = createTransport(socket, format, protocolInfo.detectedTransportFactory);
+        Transport transport = createTransport(socket, format, protocolInfo.detectedTransportFactory,
initBuffer);
 
         return new TransportInfo(format, transport, protocolInfo.detectedTransportFactory);
     }
@@ -299,11 +293,6 @@ public class AutoTcpTransportServer extends TcpTransportServer {
         }
     }
 
-    @Override
-    protected TcpTransport createTransport(Socket socket, WireFormat format) throws IOException
{
-        return new TcpTransport(format, socket, this.initBuffer);
-    }
-
     /**
      * @param socket
      * @param format
@@ -311,8 +300,8 @@ public class AutoTcpTransportServer extends TcpTransportServer {
      * @return
      */
     protected TcpTransport createTransport(Socket socket, WireFormat format,
-            TcpTransportFactory detectedTransportFactory) throws IOException {
-        return createTransport(socket, format);
+            TcpTransportFactory detectedTransportFactory, InitBuffer initBuffer) throws IOException
{
+        return new TcpTransport(format, socket, initBuffer);
     }
 
     public void setWireFormatOptions(Map<String, Map<String, Object>> wireFormatOptions)
{

http://git-wip-us.apache.org/repos/asf/activemq/blob/cc633e69/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNIOSSLTransportServer.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNIOSSLTransportServer.java
b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNIOSSLTransportServer.java
index a8d4eb9..cb38d7e 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNIOSSLTransportServer.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNIOSSLTransportServer.java
@@ -130,7 +130,7 @@ public class AutoNIOSSLTransportServer extends AutoTcpTransportServer
{
         waitForProtocolDetectionFinish(future, in.getReadSize());
         in.stop();
 
-        initBuffer = new InitBuffer(in.getReadSize().get(), ByteBuffer.allocate(in.getReadData().length));
+        InitBuffer initBuffer = new InitBuffer(in.getReadSize().get(), ByteBuffer.allocate(in.getReadData().length));
         initBuffer.buffer.put(in.getReadData());
 
         ProtocolInfo protocolInfo = detectProtocol(in.getReadData());

http://git-wip-us.apache.org/repos/asf/activemq/blob/cc633e69/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNioTransportFactory.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNioTransportFactory.java
b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNioTransportFactory.java
index e4cd539..ec9b278 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNioTransportFactory.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNioTransportFactory.java
@@ -34,6 +34,7 @@ import org.apache.activemq.transport.auto.AutoTcpTransportServer;
 import org.apache.activemq.transport.auto.AutoTransportUtils;
 import org.apache.activemq.transport.nio.NIOTransportFactory;
 import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer;
 import org.apache.activemq.transport.tcp.TcpTransportFactory;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.IntrospectionSupport;
@@ -58,13 +59,13 @@ public class AutoNioTransportFactory extends NIOTransportFactory implements
Brok
     protected AutoTcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory
serverSocketFactory) throws IOException, URISyntaxException {
         return new AutoTcpTransportServer(this, location, serverSocketFactory, brokerService,
enabledProtocols) {
             @Override
-            protected TcpTransport createTransport(Socket socket, WireFormat format, TcpTransportFactory
detectedTransportFactory) throws IOException {
+            protected TcpTransport createTransport(Socket socket, WireFormat format, TcpTransportFactory
detectedTransportFactory, InitBuffer initBuffer) throws IOException {
                 TcpTransport nioTransport = null;
                 if (detectedTransportFactory.getClass().equals(NIOTransportFactory.class))
{
-                    nioTransport = new AutoNIOTransport(format, socket,this.initBuffer);
+                    nioTransport = new AutoNIOTransport(format, socket, initBuffer);
                 } else {
                     nioTransport = detectedTransportFactory.createTransport(
-                            format, socket, this.initBuffer);
+                            format, socket, initBuffer);
                 }
 
                 if (format.getClass().toString().contains("MQTT")) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/cc633e69/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/AutoTransportConnectionsTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/AutoTransportConnectionsTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/AutoTransportConnectionsTest.java
new file mode 100644
index 0000000..02a72cf
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/AutoTransportConnectionsTest.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.auto;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.transport.tcp.TcpTransportServer;
+import org.apache.activemq.util.Wait;
+import org.apache.activemq.util.Wait.Condition;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class AutoTransportConnectionsTest {
+
+    @Rule
+    public Timeout globalTimeout = new Timeout(60, TimeUnit.SECONDS);
+
+    public static final String KEYSTORE_TYPE = "jks";
+    public static final String PASSWORD = "password";
+    public static final String SERVER_KEYSTORE = "src/test/resources/server.keystore";
+    public static final String TRUST_KEYSTORE = "src/test/resources/client.keystore";
+    private static final int maxConnections = 20;
+
+    private final ExecutorService executor = Executors.newCachedThreadPool();
+    private String connectionUri;
+    private BrokerService service;
+    private TransportConnector connector;
+    private final String transportType;
+
+    @Parameters(name="transport={0}")
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] {
+                {"auto"},
+                {"auto+nio"},
+                {"auto+ssl"},
+                {"auto+nio+ssl"},
+            });
+    }
+
+
+    public AutoTransportConnectionsTest(String transportType) {
+        super();
+        this.transportType = transportType;
+    }
+
+
+    @Before
+    public void setUp() throws Exception {
+        System.setProperty("javax.net.ssl.trustStore", TRUST_KEYSTORE);
+        System.setProperty("javax.net.ssl.trustStorePassword", PASSWORD);
+        System.setProperty("javax.net.ssl.trustStoreType", KEYSTORE_TYPE);
+        System.setProperty("javax.net.ssl.keyStore", SERVER_KEYSTORE);
+        System.setProperty("javax.net.ssl.keyStorePassword", PASSWORD);
+        System.setProperty("javax.net.ssl.keyStoreType", KEYSTORE_TYPE);
+
+        service = new BrokerService();
+        service.setPersistent(false);
+        service.setUseJmx(false);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        executor.shutdown();
+
+        service.stop();
+        service.waitUntilStopped();
+    }
+
+    public void configureConnectorAndStart(String bindAddress) throws Exception {
+        connector = service.addConnector(bindAddress);
+        connectionUri = connector.getPublishableConnectString();
+        service.start();
+        service.waitUntilStarted();
+    }
+
+    protected ConnectionFactory createConnectionFactory() throws Exception {
+        return new ActiveMQConnectionFactory(connectionUri);
+    }
+
+    @Test
+    public void testMaxConnectionControl() throws Exception {
+        configureConnectorAndStart(transportType + "://0.0.0.0:0?maxConnectionThreadPoolSize=10&maximumConnections="+maxConnections);
+
+        final ConnectionFactory cf = createConnectionFactory();
+        final CountDownLatch startupLatch = new CountDownLatch(1);
+
+        //create an extra 10 connections above max
+        for(int i = 0; i < maxConnections + 10; i++) {
+            final int count = i;
+            executor.submit(new Runnable() {
+                @Override
+                public void run() {
+                    Connection conn = null;
+                    try {
+                        startupLatch.await();
+                        //sleep for a short period of time
+                        Thread.sleep(count * 3);
+                        conn = cf.createConnection();
+                        conn.start();
+                    } catch (Exception e) {
+                    }
+                }
+            });
+        }
+
+        TcpTransportServer transportServer = (TcpTransportServer)connector.getServer();
+        // ensure the max connections is in effect
+        assertEquals(maxConnections, transportServer.getMaximumConnections());
+        // No connections at first
+        assertEquals(0, connector.getConnections().size());
+        // Release the latch to set up connections in parallel
+        startupLatch.countDown();
+
+        final TransportConnector connector = this.connector;
+
+        // Expect the max connections is created
+        assertTrue("Expected: " + maxConnections + " found: " + connector.getConnections().size(),
+            Wait.waitFor(new Wait.Condition() {
+                @Override
+                public boolean isSatisified() throws Exception {
+                    return connector.getConnections().size() == maxConnections;
+                }
+            })
+        );
+
+    }
+
+    @Test
+    public void testConcurrentConnections() throws Exception {
+        configureConnectorAndStart(transportType + "://0.0.0.0:0");
+
+        int connectionAttempts = 50;
+        ConnectionFactory factory = createConnectionFactory();
+        final AtomicInteger connectedCount = new AtomicInteger(0);
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        try {
+            for (int i = 0; i < connectionAttempts; i++) {
+                executor.execute(new Runnable() {
+
+                    @Override
+                    public void run() {
+                        try {
+                            latch.await();
+                            Connection con = factory.createConnection();
+                            con.start();
+                            connectedCount.incrementAndGet();
+                        } catch (Exception e) {
+                            //print for debugging but don't fail it might just be the transport
stopping
+                            e.printStackTrace();
+                        }
+
+                    }
+                });
+            }
+            latch.countDown();
+
+            //Make sure all attempts connected without error
+            assertTrue(Wait.waitFor(new Condition() {
+
+                @Override
+                public boolean isSatisified() throws Exception {
+                    return connectedCount.get() == connectionAttempts;
+                }
+            }));
+
+        } catch (Exception e) {
+            //print for debugging but don't fail it might just be the transport stopping
+            e.printStackTrace();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/cc633e69/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/AutoTransportMaxConnectionsTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/AutoTransportMaxConnectionsTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/AutoTransportMaxConnectionsTest.java
deleted file mode 100644
index 77fd74f..0000000
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/AutoTransportMaxConnectionsTest.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.auto;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.TransportConnector;
-import org.apache.activemq.transport.tcp.TcpTransportServer;
-import org.apache.activemq.util.Wait;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-@RunWith(Parameterized.class)
-public class AutoTransportMaxConnectionsTest {
-
-    public static final String KEYSTORE_TYPE = "jks";
-    public static final String PASSWORD = "password";
-    public static final String SERVER_KEYSTORE = "src/test/resources/server.keystore";
-    public static final String TRUST_KEYSTORE = "src/test/resources/client.keystore";
-    private static final int maxConnections = 20;
-
-    private final ExecutorService executor = Executors.newCachedThreadPool();
-    private String connectionUri;
-    private BrokerService service;
-    private TransportConnector connector;
-    private final String transportType;
-
-    @Parameters
-    public static Collection<Object[]> data() {
-        return Arrays.asList(new Object[][] {
-                {"auto"},
-                {"auto+nio"},
-                {"auto+ssl"},
-                {"auto+nio+ssl"},
-            });
-    }
-
-
-    public AutoTransportMaxConnectionsTest(String transportType) {
-        super();
-        this.transportType = transportType;
-    }
-
-
-    @Before
-    public void setUp() throws Exception {
-        System.setProperty("javax.net.ssl.trustStore", TRUST_KEYSTORE);
-        System.setProperty("javax.net.ssl.trustStorePassword", PASSWORD);
-        System.setProperty("javax.net.ssl.trustStoreType", KEYSTORE_TYPE);
-        System.setProperty("javax.net.ssl.keyStore", SERVER_KEYSTORE);
-        System.setProperty("javax.net.ssl.keyStorePassword", PASSWORD);
-        System.setProperty("javax.net.ssl.keyStoreType", KEYSTORE_TYPE);
-
-        service = new BrokerService();
-        service.setPersistent(false);
-        service.setUseJmx(false);
-        connector = service.addConnector(transportType + "://0.0.0.0:0?maxConnectionThreadPoolSize=10&maximumConnections="+maxConnections);
-        connectionUri = connector.getPublishableConnectString();
-        service.start();
-        service.waitUntilStarted();
-    }
-
-    protected ConnectionFactory createConnectionFactory() throws Exception {
-        return new ActiveMQConnectionFactory(connectionUri);
-    }
-
-    @Test(timeout=60000)
-    public void testMaxConnectionControl() throws Exception {
-        final ConnectionFactory cf = createConnectionFactory();
-        final CountDownLatch startupLatch = new CountDownLatch(1);
-
-        //create an extra 10 connections above max
-        for(int i = 0; i < maxConnections + 10; i++) {
-            final int count = i;
-            executor.submit(new Runnable() {
-                @Override
-                public void run() {
-                    Connection conn = null;
-                    try {
-                        startupLatch.await();
-                        //sleep for a short period of time
-                        Thread.sleep(count * 3);
-                        conn = cf.createConnection();
-                        conn.start();
-                    } catch (Exception e) {
-                        //JmsUtils.closeConnection(conn);
-                    }
-                }
-            });
-        }
-
-        TcpTransportServer transportServer = (TcpTransportServer)connector.getServer();
-        // ensure the max connections is in effect
-        assertEquals(maxConnections, transportServer.getMaximumConnections());
-        // No connections at first
-        assertEquals(0, connector.getConnections().size());
-        // Release the latch to set up connections in parallel
-        startupLatch.countDown();
-
-        final TransportConnector connector = this.connector;
-
-        // Expect the max connections is created
-        assertTrue("Expected: " + maxConnections + " found: " + connector.getConnections().size(),
-            Wait.waitFor(new Wait.Condition() {
-                @Override
-                public boolean isSatisified() throws Exception {
-                    return connector.getConnections().size() == maxConnections;
-                }
-            })
-        );
-
-    }
-
-    @After
-    public void tearDown() throws Exception {
-        executor.shutdown();
-
-        service.stop();
-        service.waitUntilStopped();
-    }
-}


Mime
View raw message