activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject [1/2] activemq git commit: Add some tests around honoring the transportConnector maximumConnections option
Date Fri, 08 May 2015 18:41:45 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 1359e8eae -> 16a1e2b68


Add some tests around honoring the transportConnector maximumConnections
option

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a812131d
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a812131d
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a812131d

Branch: refs/heads/master
Commit: a812131db78b3a1c8b9570a5d0294eaa715996f7
Parents: 1359e8e
Author: Timothy Bish <tabish121@gmail.com>
Authored: Fri May 8 13:35:40 2015 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Fri May 8 13:37:53 2015 -0400

----------------------------------------------------------------------
 .../transport/amqp/AmqpConnectTimeoutTest.java  |   4 +-
 .../transport/amqp/client/AmqpClient.java       |   9 +-
 .../transport/amqp/client/AmqpConnection.java   |   3 +
 .../amqp/client/util/ClientTcpTransport.java    |   7 +-
 .../AmqpConfiguredMaxConnectionsTest.java       | 162 +++++++++++++++++++
 5 files changed, 174 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/a812131d/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpConnectTimeoutTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpConnectTimeoutTest.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpConnectTimeoutTest.java
index 71a9dca..7110a25 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpConnectTimeoutTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpConnectTimeoutTest.java
@@ -73,7 +73,7 @@ public class AmqpConnectTimeoutTest extends AmqpTestSupport {
 
     @Override
     protected boolean isUseSslConnector() {
-        return true;
+        return isUseSSL();
     }
 
     @Override
@@ -83,7 +83,7 @@ public class AmqpConnectTimeoutTest extends AmqpTestSupport {
 
     @Override
     protected boolean isUseNioPlusSslConnector() {
-        return true;
+        return isUseSSL();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/a812131d/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java
index 2762732..175a8de 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java
@@ -91,14 +91,7 @@ public class AmqpClient {
             throw new IllegalArgumentException("Password must be null if user name value
is null");
         }
 
-        ClientTcpTransport transport = null;
-
-        if (remoteURI.getScheme().equals("tcp")) {
-            transport = new ClientTcpTransport(remoteURI);
-        } else {
-            throw new IllegalArgumentException("Client only support TCP currently.");
-        }
-
+        ClientTcpTransport transport = new ClientTcpTransport(remoteURI);
         AmqpConnection connection = new AmqpConnection(transport, username, password);
 
         connection.setOfferedCapabilities(getOfferedCapabilities());

http://git-wip-us.apache.org/repos/asf/activemq/blob/a812131d/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
index 171a269..d00aec7 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
@@ -152,6 +152,9 @@ public class AmqpConnection extends AmqpAbstractResource<Connection>
implements
                 future.sync();
             } else {
                 future.sync(connectTimeout, TimeUnit.MILLISECONDS);
+                if (getEndpoint().getRemoteState() != EndpointState.ACTIVE) {
+                    throw new IOException("Failed to connect after configured timeout.");
+                }
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/a812131d/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/ClientTcpTransport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/ClientTcpTransport.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/ClientTcpTransport.java
index 7aa8c62..5708088 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/ClientTcpTransport.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/ClientTcpTransport.java
@@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 import javax.net.SocketFactory;
+import javax.net.ssl.SSLSocketFactory;
 
 import org.apache.activemq.transport.tcp.TcpBufferedInputStream;
 import org.apache.activemq.transport.tcp.TcpBufferedOutputStream;
@@ -326,7 +327,11 @@ public class ClientTcpTransport implements Runnable {
     }
 
     protected SocketFactory createSocketFactory() throws IOException {
-        return SocketFactory.getDefault();
+        if (remoteLocation.getScheme().equalsIgnoreCase("ssl")) {
+            return SSLSocketFactory.getDefault();
+        } else {
+            return SocketFactory.getDefault();
+        }
     }
 
     protected void initialiseSocket(Socket sock) throws SocketException, IllegalArgumentException
{

http://git-wip-us.apache.org/repos/asf/activemq/blob/a812131d/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConfiguredMaxConnectionsTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConfiguredMaxConnectionsTest.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConfiguredMaxConnectionsTest.java
new file mode 100644
index 0000000..ae3f445
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConfiguredMaxConnectionsTest.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.interop;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Test for the transportConnector maximumConnections URI option.
+ */
+@RunWith(Parameterized.class)
+public class AmqpConfiguredMaxConnectionsTest extends AmqpClientTestSupport {
+
+    private static final int MAX_CONNECTIONS = 10;
+
+    protected boolean useSSL;
+    protected String connectorScheme;
+
+    @Parameters(name="{0}")
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] {
+                {"amqp", false},
+                {"amqp+nio", false},
+            });
+    }
+
+    public AmqpConfiguredMaxConnectionsTest(String connectorScheme, boolean useSSL) {
+        this.connectorScheme = connectorScheme;
+        this.useSSL = useSSL;
+    }
+
+    @Test(timeout = 60000)
+    public void testMaxConnectionsSettingIsHonored() throws Exception {
+        AmqpClient client = createAmqpClient();
+        assertNotNull(client);
+
+        List<AmqpConnection> connections = new ArrayList<AmqpConnection>();
+
+        for (int i = 0; i < MAX_CONNECTIONS; ++i) {
+            AmqpConnection connection = client.connect();
+            assertNotNull(connection);
+
+            connections.add(connection);
+        }
+
+        assertEquals(MAX_CONNECTIONS, getProxyToBroker().getCurrentConnectionsCount());
+
+        try {
+            AmqpConnection connection = client.createConnection();
+            connection.setConnectTimeout(3000);
+            connection.connect();
+            fail("Should not be able to create one more connection");
+        } catch (Exception ex) {
+        }
+
+        assertEquals(MAX_CONNECTIONS, getProxyToBroker().getCurrentConnectionsCount());
+
+        for (AmqpConnection connection : connections) {
+            connection.close();
+        }
+
+        assertEquals(0, getProxyToBroker().getCurrentConnectionsCount());
+    }
+
+
+    protected String getConnectorScheme() {
+        return connectorScheme;
+    }
+
+    protected boolean isUseSSL() {
+        return useSSL;
+    }
+
+    @Override
+    protected boolean isUseSslConnector() {
+        return isUseSSL();
+    }
+
+    @Override
+    protected boolean isUseNioConnector() {
+        return true;
+    }
+
+    @Override
+    protected boolean isUseNioPlusSslConnector() {
+        return isUseSSL();
+    }
+
+    @Override
+    public URI getBrokerAmqpConnectionURI() {
+        try {
+            int port = 0;
+            switch (connectorScheme) {
+                case "amqp":
+                    port = this.amqpPort;
+                    break;
+                case "amqp+ssl":
+                    port = this.amqpSslPort;
+                    break;
+                case "amqp+nio":
+                    port = this.amqpNioPort;
+                    break;
+                case "amqp+nio+ssl":
+                    port = this.amqpNioPlusSslPort;
+                    break;
+                default:
+                    throw new IOException("Invalid AMQP connector scheme passed to test.");
+            }
+
+            String uri = null;
+
+            if (isUseSSL()) {
+                uri = "ssl://127.0.0.1:" + port;
+            } else {
+                uri = "tcp://127.0.0.1:" + port;
+            }
+
+            if (!getAmqpConnectionURIOptions().isEmpty()) {
+                uri = uri + "?" + getAmqpConnectionURIOptions();
+            }
+
+            return new URI(uri);
+        } catch (Exception e) {
+            throw new RuntimeException();
+        }
+    }
+
+    @Override
+    protected String getAdditionalConfig() {
+        return "&maximumConnections=" + MAX_CONNECTIONS;
+    }
+}


Mime
View raw message