Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1731D200BB8 for ; Sat, 12 Nov 2016 16:03:18 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 15BE6160B00; Sat, 12 Nov 2016 15:03:18 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B62E2160AF4 for ; Sat, 12 Nov 2016 16:03:16 +0100 (CET) Received: (qmail 61274 invoked by uid 500); 12 Nov 2016 15:03:16 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 61265 invoked by uid 99); 12 Nov 2016 15:03:15 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 12 Nov 2016 15:03:15 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9E0DBE0158; Sat, 12 Nov 2016 15:03:15 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: cshannon@apache.org To: commits@activemq.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: activemq git commit: https://issues.apache.org/jira/browse/AMQ-6505 Date: Sat, 12 Nov 2016 15:03:15 +0000 (UTC) archived-at: Sat, 12 Nov 2016 15:03:18 -0000 Repository: activemq Updated Branches: refs/heads/activemq-5.14.x 41ce86bd9 -> cc633e691 https://issues.apache.org/jira/browse/AMQ-6505 Fixing the auto transport protocol detection so that the byte buffer that captures the initial bytes for detection is not shared across threads. This was causing failed connections under high load and high cpu usage under NIO (cherry picked from commit 7e648d512d06508d85f6a4e111d9adbdb9e33a82) Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/cc633e69 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/cc633e69 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/cc633e69 Branch: refs/heads/activemq-5.14.x Commit: cc633e6913f36b63da6d3b2a2bda4f9cff8c6a66 Parents: 41ce86b Author: Christopher L. Shannon (cshannon) Authored: Sat Nov 12 10:00:32 2016 -0500 Committer: Christopher L. Shannon (cshannon) Committed: Sat Nov 12 10:03:07 2016 -0500 ---------------------------------------------------------------------- .../transport/auto/AutoSslTransportFactory.java | 11 +- .../transport/auto/AutoSslTransportServer.java | 5 +- .../transport/auto/AutoTcpTransportFactory.java | 12 +- .../transport/auto/AutoTcpTransportServer.java | 19 +- .../auto/nio/AutoNIOSSLTransportServer.java | 2 +- .../auto/nio/AutoNioTransportFactory.java | 7 +- .../auto/AutoTransportConnectionsTest.java | 208 +++++++++++++++++++ .../auto/AutoTransportMaxConnectionsTest.java | 151 -------------- 8 files changed, 226 insertions(+), 189 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/cc633e69/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoSslTransportFactory.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoSslTransportFactory.java b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoSslTransportFactory.java index 19704e9..d1ad524 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoSslTransportFactory.java +++ b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoSslTransportFactory.java @@ -32,6 +32,7 @@ import org.apache.activemq.broker.BrokerServiceAware; import org.apache.activemq.transport.TransportServer; import org.apache.activemq.transport.tcp.SslTransportFactory; import org.apache.activemq.transport.tcp.TcpTransport; +import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer; import org.apache.activemq.transport.tcp.TcpTransportFactory; import org.apache.activemq.transport.tcp.TcpTransportServer; import org.apache.activemq.util.IOExceptionSupport; @@ -96,18 +97,12 @@ public class AutoSslTransportFactory extends SslTransportFactory implements Brok protected AutoSslTransportServer createAutoSslTransportServer(final URI location, SSLServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { AutoSslTransportServer server = new AutoSslTransportServer(this, location, serverSocketFactory, this.brokerService, enabledProtocols) { - @Override - protected TcpTransport createTransport(Socket socket, WireFormat format) - throws IOException { - setDefaultLinkStealing(format, this); - return super.createTransport(socket, format); - } @Override protected TcpTransport createTransport(Socket socket, WireFormat format, - TcpTransportFactory detectedTransportFactory) throws IOException { + TcpTransportFactory detectedTransportFactory, InitBuffer initBuffer) throws IOException { setDefaultLinkStealing(format, this); - return super.createTransport(socket, format, detectedTransportFactory); + return super.createTransport(socket, format, detectedTransportFactory, initBuffer); } }; return server; http://git-wip-us.apache.org/repos/asf/activemq/blob/cc633e69/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoSslTransportServer.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoSslTransportServer.java b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoSslTransportServer.java index acd9998..74ac7ed 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoSslTransportServer.java +++ b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoSslTransportServer.java @@ -29,6 +29,7 @@ import javax.net.ssl.SSLServerSocketFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.transport.tcp.SslTransportFactory; import org.apache.activemq.transport.tcp.TcpTransport; +import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer; import org.apache.activemq.transport.tcp.TcpTransportFactory; import org.apache.activemq.wireformat.WireFormat; @@ -120,9 +121,9 @@ public class AutoSslTransportServer extends AutoTcpTransportServer { */ @Override protected TcpTransport createTransport(Socket socket, WireFormat format, - TcpTransportFactory detectedTransportFactory) throws IOException { + TcpTransportFactory detectedTransportFactory, InitBuffer initBuffer) throws IOException { - return detectedTransportFactory.createTransport(format, socket, this.initBuffer); + return detectedTransportFactory.createTransport(format, socket, initBuffer); } @Override http://git-wip-us.apache.org/repos/asf/activemq/blob/cc633e69/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoTcpTransportFactory.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoTcpTransportFactory.java b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoTcpTransportFactory.java index 8316422..10ddca0 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoTcpTransportFactory.java +++ b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoTcpTransportFactory.java @@ -31,6 +31,7 @@ import org.apache.activemq.broker.BrokerServiceAware; import org.apache.activemq.openwire.OpenWireFormatFactory; import org.apache.activemq.transport.TransportServer; import org.apache.activemq.transport.tcp.TcpTransport; +import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer; import org.apache.activemq.transport.tcp.TcpTransportFactory; import org.apache.activemq.transport.tcp.TcpTransportServer; import org.apache.activemq.util.IOExceptionSupport; @@ -88,17 +89,10 @@ public class AutoTcpTransportFactory extends TcpTransportFactory implements Brok AutoTcpTransportServer server = new AutoTcpTransportServer(this, location, serverSocketFactory, brokerService, enabledProtocols) { @Override - protected TcpTransport createTransport(Socket socket, WireFormat format) - throws IOException { - setDefaultLinkStealing(format, this); - return super.createTransport(socket, format); - } - - @Override protected TcpTransport createTransport(Socket socket, WireFormat format, - TcpTransportFactory detectedTransportFactory) throws IOException { + TcpTransportFactory detectedTransportFactory, InitBuffer initBuffer) throws IOException { setDefaultLinkStealing(format, this); - return super.createTransport(socket, format, detectedTransportFactory); + return super.createTransport(socket, format, detectedTransportFactory, initBuffer); } }; http://git-wip-us.apache.org/repos/asf/activemq/blob/cc633e69/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoTcpTransportServer.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoTcpTransportServer.java b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoTcpTransportServer.java index 64162dd..8eeb6ac 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoTcpTransportServer.java +++ b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoTcpTransportServer.java @@ -222,12 +222,6 @@ public class AutoTcpTransportServer extends TcpTransportServer { protected final ThreadPoolExecutor service; - - /** - * This holds the initial buffer that has been read to detect the protocol. - */ - public InitBuffer initBuffer; - @Override protected void handleSocket(final Socket socket) { final AutoTcpTransportServer server = this; @@ -272,7 +266,7 @@ public class AutoTcpTransportServer extends TcpTransportServer { data.flip(); ProtocolInfo protocolInfo = detectProtocol(data.array()); - initBuffer = new InitBuffer(readBytes.get(), ByteBuffer.allocate(readBytes.get())); + InitBuffer initBuffer = new InitBuffer(readBytes.get(), ByteBuffer.allocate(readBytes.get())); initBuffer.buffer.put(data.array()); if (protocolInfo.detectedTransportFactory instanceof BrokerServiceAware) { @@ -280,7 +274,7 @@ public class AutoTcpTransportServer extends TcpTransportServer { } WireFormat format = protocolInfo.detectedWireFormatFactory.createWireFormat(); - Transport transport = createTransport(socket, format, protocolInfo.detectedTransportFactory); + Transport transport = createTransport(socket, format, protocolInfo.detectedTransportFactory, initBuffer); return new TransportInfo(format, transport, protocolInfo.detectedTransportFactory); } @@ -299,11 +293,6 @@ public class AutoTcpTransportServer extends TcpTransportServer { } } - @Override - protected TcpTransport createTransport(Socket socket, WireFormat format) throws IOException { - return new TcpTransport(format, socket, this.initBuffer); - } - /** * @param socket * @param format @@ -311,8 +300,8 @@ public class AutoTcpTransportServer extends TcpTransportServer { * @return */ protected TcpTransport createTransport(Socket socket, WireFormat format, - TcpTransportFactory detectedTransportFactory) throws IOException { - return createTransport(socket, format); + TcpTransportFactory detectedTransportFactory, InitBuffer initBuffer) throws IOException { + return new TcpTransport(format, socket, initBuffer); } public void setWireFormatOptions(Map> wireFormatOptions) { http://git-wip-us.apache.org/repos/asf/activemq/blob/cc633e69/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNIOSSLTransportServer.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNIOSSLTransportServer.java b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNIOSSLTransportServer.java index a8d4eb9..cb38d7e 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNIOSSLTransportServer.java +++ b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNIOSSLTransportServer.java @@ -130,7 +130,7 @@ public class AutoNIOSSLTransportServer extends AutoTcpTransportServer { waitForProtocolDetectionFinish(future, in.getReadSize()); in.stop(); - initBuffer = new InitBuffer(in.getReadSize().get(), ByteBuffer.allocate(in.getReadData().length)); + InitBuffer initBuffer = new InitBuffer(in.getReadSize().get(), ByteBuffer.allocate(in.getReadData().length)); initBuffer.buffer.put(in.getReadData()); ProtocolInfo protocolInfo = detectProtocol(in.getReadData()); http://git-wip-us.apache.org/repos/asf/activemq/blob/cc633e69/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNioTransportFactory.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNioTransportFactory.java b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNioTransportFactory.java index e4cd539..ec9b278 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNioTransportFactory.java +++ b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNioTransportFactory.java @@ -34,6 +34,7 @@ import org.apache.activemq.transport.auto.AutoTcpTransportServer; import org.apache.activemq.transport.auto.AutoTransportUtils; import org.apache.activemq.transport.nio.NIOTransportFactory; import org.apache.activemq.transport.tcp.TcpTransport; +import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer; import org.apache.activemq.transport.tcp.TcpTransportFactory; import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IntrospectionSupport; @@ -58,13 +59,13 @@ public class AutoNioTransportFactory extends NIOTransportFactory implements Brok protected AutoTcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { return new AutoTcpTransportServer(this, location, serverSocketFactory, brokerService, enabledProtocols) { @Override - protected TcpTransport createTransport(Socket socket, WireFormat format, TcpTransportFactory detectedTransportFactory) throws IOException { + protected TcpTransport createTransport(Socket socket, WireFormat format, TcpTransportFactory detectedTransportFactory, InitBuffer initBuffer) throws IOException { TcpTransport nioTransport = null; if (detectedTransportFactory.getClass().equals(NIOTransportFactory.class)) { - nioTransport = new AutoNIOTransport(format, socket,this.initBuffer); + nioTransport = new AutoNIOTransport(format, socket, initBuffer); } else { nioTransport = detectedTransportFactory.createTransport( - format, socket, this.initBuffer); + format, socket, initBuffer); } if (format.getClass().toString().contains("MQTT")) { http://git-wip-us.apache.org/repos/asf/activemq/blob/cc633e69/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/AutoTransportConnectionsTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/AutoTransportConnectionsTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/AutoTransportConnectionsTest.java new file mode 100644 index 0000000..02a72cf --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/AutoTransportConnectionsTest.java @@ -0,0 +1,208 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.auto; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.transport.tcp.TcpTransportServer; +import org.apache.activemq.util.Wait; +import org.apache.activemq.util.Wait.Condition; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +@RunWith(Parameterized.class) +public class AutoTransportConnectionsTest { + + @Rule + public Timeout globalTimeout = new Timeout(60, TimeUnit.SECONDS); + + public static final String KEYSTORE_TYPE = "jks"; + public static final String PASSWORD = "password"; + public static final String SERVER_KEYSTORE = "src/test/resources/server.keystore"; + public static final String TRUST_KEYSTORE = "src/test/resources/client.keystore"; + private static final int maxConnections = 20; + + private final ExecutorService executor = Executors.newCachedThreadPool(); + private String connectionUri; + private BrokerService service; + private TransportConnector connector; + private final String transportType; + + @Parameters(name="transport={0}") + public static Collection data() { + return Arrays.asList(new Object[][] { + {"auto"}, + {"auto+nio"}, + {"auto+ssl"}, + {"auto+nio+ssl"}, + }); + } + + + public AutoTransportConnectionsTest(String transportType) { + super(); + this.transportType = transportType; + } + + + @Before + public void setUp() throws Exception { + System.setProperty("javax.net.ssl.trustStore", TRUST_KEYSTORE); + System.setProperty("javax.net.ssl.trustStorePassword", PASSWORD); + System.setProperty("javax.net.ssl.trustStoreType", KEYSTORE_TYPE); + System.setProperty("javax.net.ssl.keyStore", SERVER_KEYSTORE); + System.setProperty("javax.net.ssl.keyStorePassword", PASSWORD); + System.setProperty("javax.net.ssl.keyStoreType", KEYSTORE_TYPE); + + service = new BrokerService(); + service.setPersistent(false); + service.setUseJmx(false); + } + + @After + public void tearDown() throws Exception { + executor.shutdown(); + + service.stop(); + service.waitUntilStopped(); + } + + public void configureConnectorAndStart(String bindAddress) throws Exception { + connector = service.addConnector(bindAddress); + connectionUri = connector.getPublishableConnectString(); + service.start(); + service.waitUntilStarted(); + } + + protected ConnectionFactory createConnectionFactory() throws Exception { + return new ActiveMQConnectionFactory(connectionUri); + } + + @Test + public void testMaxConnectionControl() throws Exception { + configureConnectorAndStart(transportType + "://0.0.0.0:0?maxConnectionThreadPoolSize=10&maximumConnections="+maxConnections); + + final ConnectionFactory cf = createConnectionFactory(); + final CountDownLatch startupLatch = new CountDownLatch(1); + + //create an extra 10 connections above max + for(int i = 0; i < maxConnections + 10; i++) { + final int count = i; + executor.submit(new Runnable() { + @Override + public void run() { + Connection conn = null; + try { + startupLatch.await(); + //sleep for a short period of time + Thread.sleep(count * 3); + conn = cf.createConnection(); + conn.start(); + } catch (Exception e) { + } + } + }); + } + + TcpTransportServer transportServer = (TcpTransportServer)connector.getServer(); + // ensure the max connections is in effect + assertEquals(maxConnections, transportServer.getMaximumConnections()); + // No connections at first + assertEquals(0, connector.getConnections().size()); + // Release the latch to set up connections in parallel + startupLatch.countDown(); + + final TransportConnector connector = this.connector; + + // Expect the max connections is created + assertTrue("Expected: " + maxConnections + " found: " + connector.getConnections().size(), + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return connector.getConnections().size() == maxConnections; + } + }) + ); + + } + + @Test + public void testConcurrentConnections() throws Exception { + configureConnectorAndStart(transportType + "://0.0.0.0:0"); + + int connectionAttempts = 50; + ConnectionFactory factory = createConnectionFactory(); + final AtomicInteger connectedCount = new AtomicInteger(0); + final CountDownLatch latch = new CountDownLatch(1); + + try { + for (int i = 0; i < connectionAttempts; i++) { + executor.execute(new Runnable() { + + @Override + public void run() { + try { + latch.await(); + Connection con = factory.createConnection(); + con.start(); + connectedCount.incrementAndGet(); + } catch (Exception e) { + //print for debugging but don't fail it might just be the transport stopping + e.printStackTrace(); + } + + } + }); + } + latch.countDown(); + + //Make sure all attempts connected without error + assertTrue(Wait.waitFor(new Condition() { + + @Override + public boolean isSatisified() throws Exception { + return connectedCount.get() == connectionAttempts; + } + })); + + } catch (Exception e) { + //print for debugging but don't fail it might just be the transport stopping + e.printStackTrace(); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/cc633e69/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/AutoTransportMaxConnectionsTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/AutoTransportMaxConnectionsTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/AutoTransportMaxConnectionsTest.java deleted file mode 100644 index 77fd74f..0000000 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/AutoTransportMaxConnectionsTest.java +++ /dev/null @@ -1,151 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.auto; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.util.Arrays; -import java.util.Collection; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.TransportConnector; -import org.apache.activemq.transport.tcp.TcpTransportServer; -import org.apache.activemq.util.Wait; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; - -@RunWith(Parameterized.class) -public class AutoTransportMaxConnectionsTest { - - public static final String KEYSTORE_TYPE = "jks"; - public static final String PASSWORD = "password"; - public static final String SERVER_KEYSTORE = "src/test/resources/server.keystore"; - public static final String TRUST_KEYSTORE = "src/test/resources/client.keystore"; - private static final int maxConnections = 20; - - private final ExecutorService executor = Executors.newCachedThreadPool(); - private String connectionUri; - private BrokerService service; - private TransportConnector connector; - private final String transportType; - - @Parameters - public static Collection data() { - return Arrays.asList(new Object[][] { - {"auto"}, - {"auto+nio"}, - {"auto+ssl"}, - {"auto+nio+ssl"}, - }); - } - - - public AutoTransportMaxConnectionsTest(String transportType) { - super(); - this.transportType = transportType; - } - - - @Before - public void setUp() throws Exception { - System.setProperty("javax.net.ssl.trustStore", TRUST_KEYSTORE); - System.setProperty("javax.net.ssl.trustStorePassword", PASSWORD); - System.setProperty("javax.net.ssl.trustStoreType", KEYSTORE_TYPE); - System.setProperty("javax.net.ssl.keyStore", SERVER_KEYSTORE); - System.setProperty("javax.net.ssl.keyStorePassword", PASSWORD); - System.setProperty("javax.net.ssl.keyStoreType", KEYSTORE_TYPE); - - service = new BrokerService(); - service.setPersistent(false); - service.setUseJmx(false); - connector = service.addConnector(transportType + "://0.0.0.0:0?maxConnectionThreadPoolSize=10&maximumConnections="+maxConnections); - connectionUri = connector.getPublishableConnectString(); - service.start(); - service.waitUntilStarted(); - } - - protected ConnectionFactory createConnectionFactory() throws Exception { - return new ActiveMQConnectionFactory(connectionUri); - } - - @Test(timeout=60000) - public void testMaxConnectionControl() throws Exception { - final ConnectionFactory cf = createConnectionFactory(); - final CountDownLatch startupLatch = new CountDownLatch(1); - - //create an extra 10 connections above max - for(int i = 0; i < maxConnections + 10; i++) { - final int count = i; - executor.submit(new Runnable() { - @Override - public void run() { - Connection conn = null; - try { - startupLatch.await(); - //sleep for a short period of time - Thread.sleep(count * 3); - conn = cf.createConnection(); - conn.start(); - } catch (Exception e) { - //JmsUtils.closeConnection(conn); - } - } - }); - } - - TcpTransportServer transportServer = (TcpTransportServer)connector.getServer(); - // ensure the max connections is in effect - assertEquals(maxConnections, transportServer.getMaximumConnections()); - // No connections at first - assertEquals(0, connector.getConnections().size()); - // Release the latch to set up connections in parallel - startupLatch.countDown(); - - final TransportConnector connector = this.connector; - - // Expect the max connections is created - assertTrue("Expected: " + maxConnections + " found: " + connector.getConnections().size(), - Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return connector.getConnections().size() == maxConnections; - } - }) - ); - - } - - @After - public void tearDown() throws Exception { - executor.shutdown(); - - service.stop(); - service.waitUntilStopped(); - } -}