activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject [2/2] git commit: Refactored MQTT test suite to use parameterized tests and ensure that the various tests are run on the currently supported transport connectors.
Date Wed, 23 Jul 2014 22:46:34 GMT
Refactored MQTT test suite to use parameterized tests and ensure that
the various tests are run on the currently supported transport
connectors.  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/fb569e3f
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/fb569e3f
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/fb569e3f

Branch: refs/heads/trunk
Commit: fb569e3fbcc11d7a3d8a24a4d43680055c383f5e
Parents: 93f686c
Author: Timothy Bish <tabish121@gmail.com>
Authored: Wed Jul 23 18:46:11 2014 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Wed Jul 23 18:46:11 2014 -0400

----------------------------------------------------------------------
 .../transport/mqtt/AbstractMQTTTest.java        | 127 -------
 .../transport/mqtt/MQTTAuthTestSupport.java     | 115 +++++++
 .../activemq/transport/mqtt/MQTTAuthTests.java  | 174 ++++++++++
 .../activemq/transport/mqtt/MQTTNioTest.java    | 121 -------
 .../activemq/transport/mqtt/MQTTSSLTest.java    |  96 ------
 .../activemq/transport/mqtt/MQTTTest.java       | 337 ++++---------------
 .../transport/mqtt/MQTTTestSupport.java         | 221 ++++++------
 .../activemq/transport/mqtt/MQTTTests.java      |  71 ----
 .../transport/mqtt/PahoMQTNioTTest.java         | 119 -------
 .../activemq/transport/mqtt/PahoMQTTTest.java   | 110 +++++-
 10 files changed, 577 insertions(+), 914 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/fb569e3f/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java
deleted file mode 100644
index 13a0e75..0000000
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.mqtt;
-
-import java.io.File;
-import java.io.IOException;
-import java.security.ProtectionDomain;
-import java.util.LinkedList;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.AutoFailTestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.TransportConnector;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.util.ByteSequence;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import static org.junit.Assert.assertArrayEquals;
-
-public abstract class AbstractMQTTTest extends AutoFailTestSupport {
-    protected TransportConnector mqttConnector;
-    protected TransportConnector openwireConnector;
-
-    public static final int AT_MOST_ONCE =0;
-    public static final int AT_LEAST_ONCE = 1;
-    public static final int EXACTLY_ONCE =2;
-
-    public File basedir() throws IOException {
-        ProtectionDomain protectionDomain = getClass().getProtectionDomain();
-        return new File(new File(protectionDomain.getCodeSource().getLocation().getPath()), "../..").getCanonicalFile();
-    }
-
-    protected BrokerService brokerService;
-    protected LinkedList<Throwable> exceptions = new LinkedList<Throwable>();
-    protected int numberOfMessages;
-
-    @Override
-    @Before
-    public void setUp() throws Exception {
-        super.setUp();
-        exceptions.clear();
-        brokerService = new BrokerService();
-        brokerService.setPersistent(false);
-        brokerService.setAdvisorySupport(false);
-        brokerService.setUseJmx(false);
-        this.numberOfMessages = 1000;
-    }
-
-    @Override
-    @After
-    public void tearDown() throws Exception {
-        if (brokerService != null) {
-            brokerService.stop();
-        }
-        super.tearDown();
-    }
-
-    protected String getProtocolScheme() {
-        return "mqtt";
-    }
-
-    protected void addMQTTConnector() throws Exception {
-        addMQTTConnector("");
-    }
-
-    protected void addMQTTConnector(String config) throws Exception {
-        mqttConnector = brokerService.addConnector(getProtocolScheme()+"://localhost:0?" + config);
-    }
-
-    protected void addOpenwireConnector() throws Exception {
-        openwireConnector = brokerService.addConnector("tcp://localhost:0");
-    }
-
-    protected void initializeConnection(MQTTClientProvider provider) throws Exception {
-        provider.connect("tcp://localhost:"+mqttConnector.getConnectUri().getPort());
-    }
-
-    protected static interface Task {
-        public void run() throws Exception;
-    }
-
-    protected  void within(int time, TimeUnit unit, Task task) throws InterruptedException {
-        long timeMS = unit.toMillis(time);
-        long deadline = System.currentTimeMillis() + timeMS;
-        while (true) {
-            try {
-                task.run();
-                return;
-            } catch (Throwable e) {
-                long remaining = deadline - System.currentTimeMillis();
-                if( remaining <=0 ) {
-                    if( e instanceof RuntimeException ) {
-                        throw (RuntimeException)e;
-                    }
-                    if( e instanceof Error ) {
-                        throw (Error)e;
-                    }
-                    throw new RuntimeException(e);
-                }
-                Thread.sleep(Math.min(timeMS/10, remaining));
-            }
-        }
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq/blob/fb569e3f/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTestSupport.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTestSupport.java
new file mode 100644
index 0000000..bbe4d1a
--- /dev/null
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTestSupport.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.mqtt;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.filter.DestinationMapEntry;
+import org.apache.activemq.security.AuthenticationUser;
+import org.apache.activemq.security.AuthorizationEntry;
+import org.apache.activemq.security.AuthorizationPlugin;
+import org.apache.activemq.security.DefaultAuthorizationMap;
+import org.apache.activemq.security.SimpleAuthenticationPlugin;
+import org.apache.activemq.security.TempDestinationAuthorizationEntry;
+
+/**
+ * Used as a base class for MQTT tests that require Authentication and Authorization
+ * to be configured on the Broker.
+ */
+public class MQTTAuthTestSupport extends MQTTTestSupport {
+
+    @Override
+    protected BrokerPlugin configureAuthentication() throws Exception {
+        List<AuthenticationUser> users = new ArrayList<AuthenticationUser>();
+        users.add(new AuthenticationUser("admin", "admin", "users,admins"));
+        users.add(new AuthenticationUser("user", "password", "users"));
+        users.add(new AuthenticationUser("guest", "password", "guests"));
+        SimpleAuthenticationPlugin authenticationPlugin = new SimpleAuthenticationPlugin(users);
+        authenticationPlugin.setAnonymousAccessAllowed(true);
+
+        return authenticationPlugin;
+    }
+
+    @Override
+    protected BrokerPlugin configureAuthorization() throws Exception {
+
+        @SuppressWarnings("rawtypes")
+        List<DestinationMapEntry> authorizationEntries = new ArrayList<DestinationMapEntry>();
+
+        AuthorizationEntry entry = new AuthorizationEntry();
+        entry.setQueue(">");
+        entry.setRead("admins");
+        entry.setWrite("admins");
+        entry.setAdmin("admins");
+        authorizationEntries.add(entry);
+        entry = new AuthorizationEntry();
+        entry.setQueue("USERS.>");
+        entry.setRead("users");
+        entry.setWrite("users");
+        entry.setAdmin("users");
+        authorizationEntries.add(entry);
+        entry = new AuthorizationEntry();
+        entry.setQueue("GUEST.>");
+        entry.setRead("guests");
+        entry.setWrite("guests,users");
+        entry.setAdmin("guests,users");
+        authorizationEntries.add(entry);
+        entry = new AuthorizationEntry();
+        entry.setTopic(">");
+        entry.setRead("admins");
+        entry.setWrite("admins");
+        entry.setAdmin("admins");
+        authorizationEntries.add(entry);
+        entry = new AuthorizationEntry();
+        entry.setTopic("USERS.>");
+        entry.setRead("users");
+        entry.setWrite("users");
+        entry.setAdmin("users");
+        authorizationEntries.add(entry);
+        entry = new AuthorizationEntry();
+        entry.setTopic("GUEST.>");
+        entry.setRead("guests");
+        entry.setWrite("guests,users");
+        entry.setAdmin("guests,users");
+        authorizationEntries.add(entry);
+        entry = new AuthorizationEntry();
+        entry.setTopic("anonymous");
+        entry.setRead("guests,anonymous");
+        entry.setWrite("guests,users,anonymous");
+        entry.setAdmin("guests,users,anonymous");
+        authorizationEntries.add(entry);
+        entry = new AuthorizationEntry();
+        entry.setTopic("ActiveMQ.Advisory.>");
+        entry.setRead("guests,users,anonymous");
+        entry.setWrite("guests,users,anonymous");
+        entry.setAdmin("guests,users,anonymous");
+        authorizationEntries.add(entry);
+
+        TempDestinationAuthorizationEntry tempEntry = new TempDestinationAuthorizationEntry();
+        tempEntry.setRead("admins");
+        tempEntry.setWrite("admins");
+        tempEntry.setAdmin("admins");
+
+        DefaultAuthorizationMap authorizationMap = new DefaultAuthorizationMap(authorizationEntries);
+        authorizationMap.setTempDestinationAuthorizationEntry(tempEntry);
+        AuthorizationPlugin authorizationPlugin = new AuthorizationPlugin(authorizationMap);
+
+        return authorizationPlugin;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/fb569e3f/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTests.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTests.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTests.java
new file mode 100644
index 0000000..8c832b7
--- /dev/null
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTests.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.mqtt;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
+import java.net.ProtocolException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.Message;
+import org.fusesource.mqtt.client.QoS;
+import org.fusesource.mqtt.client.Topic;
+import org.fusesource.mqtt.client.Tracer;
+import org.fusesource.mqtt.codec.CONNACK;
+import org.fusesource.mqtt.codec.MQTTFrame;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests various use cases that require authentication or authorization over MQTT
+ */
+@RunWith(Parameterized.class)
+public class MQTTAuthTests extends MQTTAuthTestSupport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MQTTAuthTests.class);
+
+    @Parameters(name= "{index}: scheme({0})")
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] {
+                {"mqtt", false},
+                {"mqtt+ssl", true},
+                {"mqtt+nio", false}
+                // TODO - Fails {"mqtt+nio+ssl", true}
+            });
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testAnonymousUserConnect() throws Exception {
+        MQTT mqtt = createMQTTConnection();
+        mqtt.setCleanSession(true);
+        mqtt.setUserName((String)null);
+        mqtt.setPassword((String)null);
+        final BlockingConnection connection = mqtt.blockingConnection();
+        connection.connect();
+        LOG.info("Connected as anonymous client");
+        connection.disconnect();
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testBadUserNameOrPasswordGetsConnAckWithErrorCode() throws Exception {
+        MQTT mqttPub = createMQTTConnection("pub", true);
+        mqttPub.setUserName("foo");
+        mqttPub.setPassword("bar");
+
+        final AtomicBoolean failed = new AtomicBoolean();
+
+        mqttPub.setTracer(new Tracer() {
+            @Override
+            public void onReceive(MQTTFrame frame) {
+                LOG.info("Client received: {}", frame);
+                if (frame.messageType() == CONNACK.TYPE) {
+                    CONNACK connAck = new CONNACK();
+                    try {
+                        connAck.decode(frame);
+                        LOG.info("{}", connAck);
+                        assertEquals(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD, connAck.code());
+                    } catch (ProtocolException e) {
+                        failed.set(true);
+                        fail("Error decoding publish " + e.getMessage());
+                    } catch (Throwable err) {
+                        failed.set(true);
+                        throw err;
+                    }
+                }
+            }
+
+            @Override
+            public void onSend(MQTTFrame frame) {
+                LOG.info("Client sent: {}", frame);
+            }
+        });
+
+        BlockingConnection connectionPub = mqttPub.blockingConnection();
+        try {
+            connectionPub.connect();
+            fail("Should not be able to connect.");
+        } catch (Exception e) {
+        }
+
+        assertFalse("connection should have failed.", failed.get());
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testFailedSubscription() throws Exception {
+        final String ANONYMOUS = "anonymous";
+
+        MQTT mqtt = createMQTTConnection();
+        mqtt.setClientId("foo");
+        mqtt.setKeepAlive((short) 2);
+
+        final BlockingConnection connection = mqtt.blockingConnection();
+        connection.connect();
+
+        final String NAMED = "named";
+        byte[] qos = connection.subscribe(new Topic[] { new Topic(NAMED, QoS.AT_MOST_ONCE), new Topic(ANONYMOUS, QoS.EXACTLY_ONCE) });
+        assertEquals((byte) 0x80, qos[0]);
+        assertEquals((byte) QoS.EXACTLY_ONCE.ordinal(), qos[1]);
+
+        // validate the subscription by sending a retained message
+        connection.publish(ANONYMOUS, ANONYMOUS.getBytes(), QoS.AT_MOST_ONCE, true);
+        Message msg = connection.receive(1000, TimeUnit.MILLISECONDS);
+        assertNotNull(msg);
+        assertEquals(ANONYMOUS, new String(msg.getPayload()));
+        msg.ack();
+
+        connection.unsubscribe(new String[] { ANONYMOUS });
+        qos = connection.subscribe(new Topic[] { new Topic(ANONYMOUS, QoS.AT_LEAST_ONCE) });
+        assertEquals((byte) QoS.AT_LEAST_ONCE.ordinal(), qos[0]);
+
+        msg = connection.receive(1000, TimeUnit.MILLISECONDS);
+        assertNotNull(msg);
+        assertEquals(ANONYMOUS, new String(msg.getPayload()));
+        msg.ack();
+
+        connection.disconnect();
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testWildcardRetainedSubscription() throws Exception {
+        MQTT mqttPub = createMQTTConnection("pub", true);
+        mqttPub.setUserName("admin");
+        mqttPub.setPassword("admin");
+
+        BlockingConnection connectionPub = mqttPub.blockingConnection();
+        connectionPub.connect();
+        connectionPub.publish("one", "test".getBytes(), QoS.AT_LEAST_ONCE, true);
+
+        MQTT mqttSub = createMQTTConnection("sub", true);
+        mqttSub.setUserName("user");
+        mqttSub.setPassword("password");
+        BlockingConnection connectionSub = mqttSub.blockingConnection();
+        connectionSub.connect();
+        connectionSub.subscribe(new Topic[]{new Topic("#", QoS.AT_LEAST_ONCE)});
+        Message msg = connectionSub.receive(1, TimeUnit.SECONDS);
+        assertNull("Shouldn't receive the message", msg);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/fb569e3f/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTNioTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTNioTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTNioTest.java
deleted file mode 100644
index 7104e41..0000000
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTNioTest.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.mqtt;
-
-import java.util.LinkedList;
-
-import org.apache.activemq.broker.BrokerPlugin;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.filter.DestinationMapEntry;
-import org.apache.activemq.security.AuthenticationUser;
-import org.apache.activemq.security.AuthorizationEntry;
-import org.apache.activemq.security.AuthorizationPlugin;
-import org.apache.activemq.security.DefaultAuthorizationMap;
-import org.apache.activemq.security.SimpleAuthenticationPlugin;
-import org.apache.activemq.util.Wait;
-import org.fusesource.mqtt.client.BlockingConnection;
-import org.fusesource.mqtt.client.MQTT;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-import org.junit.runner.RunWith;
-import org.junit.runners.BlockJUnit4ClassRunner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@RunWith(BlockJUnit4ClassRunner.class)
-public class MQTTNioTest extends MQTTTest {
-    protected static final Logger LOG = LoggerFactory.getLogger(MQTTNioTest.class);
-
-    @Rule
-    public TestName testname = new TestName();
-
-    @Before
-    public void setUp() throws Exception {
-        super.setUp();
-        LOG.debug("Starting {}", testname.getMethodName());
-    }
-
-    @Override
-    protected String getProtocolScheme() {
-        return "mqtt+nio";
-    }
-
-    @Test(timeout = 60 * 1000)
-    public void testPingOnMQTTNIO() throws Exception {
-        addMQTTConnector("maxInactivityDuration=-1");
-        brokerService.start();
-        MQTT mqtt = createMQTTConnection();
-        mqtt.setClientId("test-mqtt");
-        mqtt.setKeepAlive((short)2);
-        final BlockingConnection connection = mqtt.blockingConnection();
-        connection.connect();
-        assertTrue("KeepAlive didn't work properly", Wait.waitFor(new Wait.Condition() {
-
-            @Override
-            public boolean isSatisified() throws Exception {
-                return connection.isConnected();
-            }
-        }));
-
-        connection.disconnect();
-    }
-
-    @Test(timeout = 60 * 1000)
-    public void testAnonymousUserConnect() throws Exception {
-        addMQTTConnector();
-        configureAuthentication(brokerService);
-        brokerService.start();
-        brokerService.waitUntilStarted();
-        MQTT mqtt = createMQTTConnection();
-        mqtt.setCleanSession(true);
-        mqtt.setUserName((String)null);
-        mqtt.setPassword((String)null);
-        final BlockingConnection connection = mqtt.blockingConnection();
-        connection.connect();
-
-        System.out.println("Connected!");
-
-        connection.disconnect();
-
-    }
-
-    private void configureAuthentication(BrokerService brokerService) throws Exception {
-        LinkedList<AuthenticationUser> users = new LinkedList<AuthenticationUser>();
-        users.add(new AuthenticationUser("user1", "user1", "anonymous,user1group"));
-        final SimpleAuthenticationPlugin authenticationPlugin = new SimpleAuthenticationPlugin(users);
-
-        DefaultAuthorizationMap map = new DefaultAuthorizationMap();
-        LinkedList<DestinationMapEntry> authz = new LinkedList<DestinationMapEntry>();
-        AuthorizationEntry entry = new AuthorizationEntry();
-        entry.setDestination(new ActiveMQTopic(">"));
-        entry.setAdmin("admins");
-        entry.setRead("admins,anonymous");
-        entry.setWrite("admins");
-        authz.add(entry);
-        map.setAuthorizationEntries(authz);
-        AuthorizationPlugin authorizationPlugin = new AuthorizationPlugin(map);
-        authenticationPlugin.setAnonymousAccessAllowed(true);
-
-        brokerService.setPlugins(new BrokerPlugin[]{
-                authenticationPlugin, authorizationPlugin
-        });
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/fb569e3f/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java
deleted file mode 100644
index 1eb4ff5..0000000
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.mqtt;
-
-import java.security.SecureRandom;
-import java.security.cert.CertificateException;
-import java.security.cert.X509Certificate;
-import javax.net.ssl.KeyManager;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.TrustManager;
-import javax.net.ssl.X509TrustManager;
-
-import org.fusesource.mqtt.client.MQTT;
-import org.junit.runner.RunWith;
-import org.junit.runners.BlockJUnit4ClassRunner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@RunWith(BlockJUnit4ClassRunner.class)
-public class MQTTSSLTest extends MQTTTest {
-
-    private static final Logger LOG = LoggerFactory.getLogger(MQTTSSLTest.class);
-
-    public void setUp() throws Exception {
-        String basedir = basedir().getPath();
-        System.setProperty("javax.net.ssl.trustStore", basedir+"/src/test/resources/client.keystore");
-        System.setProperty("javax.net.ssl.trustStorePassword", "password");
-        System.setProperty("javax.net.ssl.trustStoreType", "jks");
-        System.setProperty("javax.net.ssl.keyStore", basedir+"/src/test/resources/server.keystore");
-        System.setProperty("javax.net.ssl.keyStorePassword", "password");
-        System.setProperty("javax.net.ssl.keyStoreType", "jks");
-        super.setUp();
-    }
-
-    @Override
-    protected String getProtocolScheme() {
-        return "mqtt+ssl";
-    }
-
-    protected MQTT createMQTTConnection() throws Exception {
-        MQTT mqtt = new MQTT();
-        mqtt.setConnectAttemptsMax(1);
-        mqtt.setReconnectAttemptsMax(0);
-        mqtt.setTracer(createTracer());
-        mqtt.setHost("ssl://localhost:"+mqttConnector.getConnectUri().getPort());
-        SSLContext ctx = SSLContext.getInstance("TLS");
-        ctx.init(new KeyManager[0], new TrustManager[]{new DefaultTrustManager()}, new SecureRandom());
-        mqtt.setSslContext(ctx);
-        return mqtt;
-    }
-
-    protected MQTT createMQTTConnection(String clientId, boolean clean) throws Exception {
-        MQTT mqtt = createMQTTConnection();
-        if (clientId != null) {
-            mqtt.setClientId(clientId);
-        }
-        mqtt.setCleanSession(clean);
-        return mqtt;
-    }
-
-    protected void initializeConnection(MQTTClientProvider provider) throws Exception {
-        SSLContext ctx = SSLContext.getInstance("TLS");
-        ctx.init(new KeyManager[0], new TrustManager[]{new DefaultTrustManager()}, new SecureRandom());
-        provider.setSslContext(ctx);
-        provider.connect("ssl://localhost:"+mqttConnector.getConnectUri().getPort());
-    }
-
-
-
-    static class DefaultTrustManager implements X509TrustManager {
-
-        public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
-        }
-
-        public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
-        }
-
-        public X509Certificate[] getAcceptedIssuers() {
-            return new X509Certificate[0];
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/fb569e3f/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
index 7d5a87e..ec7f1cc 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
@@ -16,10 +16,18 @@
  */
 package org.apache.activemq.transport.mqtt;
 
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 import java.net.ProtocolException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.LinkedList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
@@ -27,6 +35,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
+
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.Destination;
@@ -35,26 +44,13 @@ import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertNotEquals;
 import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerPlugin;
-import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.filter.DestinationMapEntry;
-import org.apache.activemq.jaas.GroupPrincipal;
-import org.apache.activemq.security.AuthenticationUser;
-import org.apache.activemq.security.AuthorizationEntry;
-import org.apache.activemq.security.AuthorizationPlugin;
-import org.apache.activemq.security.DefaultAuthorizationMap;
-import org.apache.activemq.security.SimpleAuthenticationPlugin;
-import org.apache.activemq.security.SimpleAuthorizationMap;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.Wait;
 import org.fusesource.mqtt.client.BlockingConnection;
@@ -66,28 +62,42 @@ import org.fusesource.mqtt.client.Tracer;
 import org.fusesource.mqtt.codec.MQTTFrame;
 import org.fusesource.mqtt.codec.PUBLISH;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class MQTTTest extends AbstractMQTTTest {
+@RunWith(Parameterized.class)
+public class MQTTTest extends MQTTTestSupport {
 
     private static final Logger LOG = LoggerFactory.getLogger(MQTTTest.class);
 
+    private static final int NUM_MESSAGES = 250;
+
+    @Parameters(name= "{index}: scheme({0})")
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] {
+                {"mqtt", false},
+                {"mqtt+ssl", true},
+                {"mqtt+nio", false}
+                // TODO - Fails {"mqtt+nio+ssl", true}
+            });
+    }
+
     @Test(timeout = 60 * 1000)
     public void testSendAndReceiveMQTT() throws Exception {
-        addMQTTConnector();
-        brokerService.start();
         final MQTTClientProvider subscriptionProvider = getMQTTClientProvider();
         initializeConnection(subscriptionProvider);
 
         subscriptionProvider.subscribe("foo/bah", AT_MOST_ONCE);
 
-        final CountDownLatch latch = new CountDownLatch(numberOfMessages);
+        final CountDownLatch latch = new CountDownLatch(NUM_MESSAGES);
 
         Thread thread = new Thread(new Runnable() {
             @Override
             public void run() {
-                for (int i = 0; i < numberOfMessages; i++) {
+                for (int i = 0; i < NUM_MESSAGES; i++) {
                     try {
                         byte[] payload = subscriptionProvider.receive(10000);
                         assertNotNull("Should get a message", payload);
@@ -105,7 +115,7 @@ public class MQTTTest extends AbstractMQTTTest {
         final MQTTClientProvider publishProvider = getMQTTClientProvider();
         initializeConnection(publishProvider);
 
-        for (int i = 0; i < numberOfMessages; i++) {
+        for (int i = 0; i < NUM_MESSAGES; i++) {
             String payload = "Message " + i;
             publishProvider.publish("foo/bah", payload.getBytes(), AT_LEAST_ONCE);
         }
@@ -118,8 +128,6 @@ public class MQTTTest extends AbstractMQTTTest {
 
     @Test(timeout = 60 * 1000)
     public void testUnsubscribeMQTT() throws Exception {
-        addMQTTConnector();
-        brokerService.start();
         final MQTTClientProvider subscriptionProvider = getMQTTClientProvider();
         initializeConnection(subscriptionProvider);
 
@@ -127,12 +135,12 @@ public class MQTTTest extends AbstractMQTTTest {
 
         subscriptionProvider.subscribe(topic, AT_MOST_ONCE);
 
-        final CountDownLatch latch = new CountDownLatch(numberOfMessages / 2);
+        final CountDownLatch latch = new CountDownLatch(NUM_MESSAGES / 2);
 
         Thread thread = new Thread(new Runnable() {
             @Override
             public void run() {
-                for (int i = 0; i < numberOfMessages; i++) {
+                for (int i = 0; i < NUM_MESSAGES; i++) {
                     try {
                         byte[] payload = subscriptionProvider.receive(10000);
                         assertNotNull("Should get a message", payload);
@@ -150,9 +158,9 @@ public class MQTTTest extends AbstractMQTTTest {
         final MQTTClientProvider publishProvider = getMQTTClientProvider();
         initializeConnection(publishProvider);
 
-        for (int i = 0; i < numberOfMessages; i++) {
+        for (int i = 0; i < NUM_MESSAGES; i++) {
             String payload = "Message " + i;
-            if (i == numberOfMessages / 2) {
+            if (i == NUM_MESSAGES / 2) {
                 subscriptionProvider.unsubscribe(topic);
             }
             publishProvider.publish(topic, payload.getBytes(), AT_LEAST_ONCE);
@@ -171,13 +179,10 @@ public class MQTTTest extends AbstractMQTTTest {
          * with AT_MOST_ONCE - in MQTT the QoS is always determined by the
          * message as published - not the wish of the subscriber
          */
-        addMQTTConnector();
-        brokerService.start();
-
         final MQTTClientProvider provider = getMQTTClientProvider();
         initializeConnection(provider);
         provider.subscribe("foo", EXACTLY_ONCE);
-        for (int i = 0; i < numberOfMessages; i++) {
+        for (int i = 0; i < NUM_MESSAGES; i++) {
             String payload = "Test Message: " + i;
             provider.publish("foo", payload.getBytes(), AT_MOST_ONCE);
             byte[] message = provider.receive(5000);
@@ -189,13 +194,10 @@ public class MQTTTest extends AbstractMQTTTest {
 
     @Test(timeout = 2 * 60 * 1000)
     public void testSendAtLeastOnceReceiveExactlyOnce() throws Exception {
-        addMQTTConnector();
-        brokerService.start();
-
         final MQTTClientProvider provider = getMQTTClientProvider();
         initializeConnection(provider);
         provider.subscribe("foo", EXACTLY_ONCE);
-        for (int i = 0; i < numberOfMessages; i++) {
+        for (int i = 0; i < NUM_MESSAGES; i++) {
             String payload = "Test Message: " + i;
             provider.publish("foo", payload.getBytes(), AT_LEAST_ONCE);
             byte[] message = provider.receive(5000);
@@ -207,13 +209,10 @@ public class MQTTTest extends AbstractMQTTTest {
 
     @Test(timeout = 2 * 60 * 1000)
     public void testSendAtLeastOnceReceiveAtMostOnce() throws Exception {
-        addMQTTConnector();
-        brokerService.start();
-
         final MQTTClientProvider provider = getMQTTClientProvider();
         initializeConnection(provider);
         provider.subscribe("foo", AT_MOST_ONCE);
-        for (int i = 0; i < numberOfMessages; i++) {
+        for (int i = 0; i < NUM_MESSAGES; i++) {
             String payload = "Test Message: " + i;
             provider.publish("foo", payload.getBytes(), AT_LEAST_ONCE);
             byte[] message = provider.receive(5000);
@@ -225,13 +224,10 @@ public class MQTTTest extends AbstractMQTTTest {
 
     @Test(timeout = 60 * 1000)
     public void testSendAndReceiveAtMostOnce() throws Exception {
-        addMQTTConnector();
-        brokerService.start();
-
         final MQTTClientProvider provider = getMQTTClientProvider();
         initializeConnection(provider);
         provider.subscribe("foo", AT_MOST_ONCE);
-        for (int i = 0; i < numberOfMessages; i++) {
+        for (int i = 0; i < NUM_MESSAGES; i++) {
             String payload = "Test Message: " + i;
             provider.publish("foo", payload.getBytes(), AT_MOST_ONCE);
             byte[] message = provider.receive(5000);
@@ -243,13 +239,10 @@ public class MQTTTest extends AbstractMQTTTest {
 
     @Test(timeout = 2 * 60 * 1000)
     public void testSendAndReceiveAtLeastOnce() throws Exception {
-        addMQTTConnector();
-        brokerService.start();
-
         final MQTTClientProvider provider = getMQTTClientProvider();
         initializeConnection(provider);
         provider.subscribe("foo", AT_LEAST_ONCE);
-        for (int i = 0; i < numberOfMessages; i++) {
+        for (int i = 0; i < NUM_MESSAGES; i++) {
             String payload = "Test Message: " + i;
             provider.publish("foo", payload.getBytes(), AT_LEAST_ONCE);
             byte[] message = provider.receive(5000);
@@ -261,8 +254,6 @@ public class MQTTTest extends AbstractMQTTTest {
 
     @Test(timeout = 60 * 1000)
     public void testSendAndReceiveExactlyOnce() throws Exception {
-        addMQTTConnector();
-        brokerService.start();
         final MQTTClientProvider publisher = getMQTTClientProvider();
         initializeConnection(publisher);
 
@@ -270,7 +261,7 @@ public class MQTTTest extends AbstractMQTTTest {
         initializeConnection(subscriber);
 
         subscriber.subscribe("foo", EXACTLY_ONCE);
-        for (int i = 0; i < numberOfMessages; i++) {
+        for (int i = 0; i < NUM_MESSAGES; i++) {
             String payload = "Test Message: " + i;
             publisher.publish("foo", payload.getBytes(), EXACTLY_ONCE);
             byte[] message = subscriber.receive(5000);
@@ -287,9 +278,6 @@ public class MQTTTest extends AbstractMQTTTest {
         for (int i = 0; i < payload.length; i++) {
             payload[i] = '2';
         }
-        addMQTTConnector();
-        brokerService.start();
-
         final MQTTClientProvider publisher = getMQTTClientProvider();
         initializeConnection(publisher);
 
@@ -310,10 +298,6 @@ public class MQTTTest extends AbstractMQTTTest {
 
     @Test(timeout = 60 * 1000)
     public void testSendAndReceiveRetainedMessages() throws Exception {
-
-        addMQTTConnector();
-        brokerService.start();
-
         final MQTTClientProvider publisher = getMQTTClientProvider();
         initializeConnection(publisher);
 
@@ -348,9 +332,6 @@ public class MQTTTest extends AbstractMQTTTest {
 
     @Test(timeout = 30 * 1000)
     public void testValidZeroLengthClientId() throws Exception {
-        addMQTTConnector();
-        brokerService.start();
-
         MQTT mqtt = createMQTTConnection();
         mqtt.setClientId("");
         mqtt.setCleanSession(true);
@@ -362,9 +343,6 @@ public class MQTTTest extends AbstractMQTTTest {
 
     @Test(timeout = 2 *  60 * 1000)
     public void testMQTTPathPatterns() throws Exception {
-        addMQTTConnector();
-        brokerService.start();
-
         MQTT mqtt = createMQTTConnection();
         mqtt.setClientId("");
         mqtt.setCleanSession(true);
@@ -434,9 +412,6 @@ public class MQTTTest extends AbstractMQTTTest {
 
     @Test(timeout = 60 * 1000)
     public void testMQTTRetainQoS() throws Exception {
-        addMQTTConnector();
-        brokerService.start();
-
         String[] topics = { "AT_MOST_ONCE", "AT_LEAST_ONCE", "EXACTLY_ONCE" };
         for (int i = 0; i < topics.length; i++) {
             final String topic = topics[i];
@@ -480,9 +455,6 @@ public class MQTTTest extends AbstractMQTTTest {
 
     @Test(timeout = 60 * 1000)
     public void testDuplicateSubscriptions() throws Exception {
-        addMQTTConnector();
-        brokerService.start();
-
         MQTT mqtt = createMQTTConnection();
         mqtt.setClientId("foo");
         mqtt.setKeepAlive((short) 2);
@@ -528,9 +500,6 @@ public class MQTTTest extends AbstractMQTTTest {
 
     @Test(timeout = 120 * 1000)
     public void testRetainedMessage() throws Exception {
-        addMQTTConnector();
-        brokerService.start();
-
         MQTT mqtt = createMQTTConnection();
         mqtt.setKeepAlive((short) 2);
 
@@ -598,58 +567,7 @@ public class MQTTTest extends AbstractMQTTTest {
     }
 
     @Test(timeout = 60 * 1000)
-    public void testFailedSubscription() throws Exception {
-        addMQTTConnector();
-
-        final SimpleAuthenticationPlugin authenticationPlugin = new SimpleAuthenticationPlugin();
-        authenticationPlugin.setAnonymousAccessAllowed(true);
-
-        final String ANONYMOUS = "anonymous";
-        authenticationPlugin.setAnonymousGroup(ANONYMOUS);
-        final DefaultAuthorizationMap map = new DefaultAuthorizationMap();
-        // only one authorized destination, anonymous for anonymous group!
-        map.put(new ActiveMQTopic(ANONYMOUS), new GroupPrincipal(ANONYMOUS));
-        final AuthorizationPlugin authorizationPlugin = new AuthorizationPlugin(new SimpleAuthorizationMap(map, map, map));
-
-        brokerService.setPlugins(new BrokerPlugin[] { authorizationPlugin, authenticationPlugin });
-        brokerService.start();
-
-        MQTT mqtt = createMQTTConnection();
-        mqtt.setClientId("foo");
-        mqtt.setKeepAlive((short) 2);
-
-        final BlockingConnection connection = mqtt.blockingConnection();
-        connection.connect();
-
-        final String NAMED = "named";
-        byte[] qos = connection.subscribe(new Topic[] { new Topic(NAMED, QoS.AT_MOST_ONCE), new Topic(ANONYMOUS, QoS.EXACTLY_ONCE) });
-        assertEquals((byte) 0x80, qos[0]);
-        assertEquals((byte) QoS.EXACTLY_ONCE.ordinal(), qos[1]);
-
-        // validate the subscription by sending a retained message
-        connection.publish(ANONYMOUS, ANONYMOUS.getBytes(), QoS.AT_MOST_ONCE, true);
-        Message msg = connection.receive(1000, TimeUnit.MILLISECONDS);
-        assertNotNull(msg);
-        assertEquals(ANONYMOUS, new String(msg.getPayload()));
-        msg.ack();
-
-        connection.unsubscribe(new String[] { ANONYMOUS });
-        qos = connection.subscribe(new Topic[] { new Topic(ANONYMOUS, QoS.AT_LEAST_ONCE) });
-        assertEquals((byte) QoS.AT_LEAST_ONCE.ordinal(), qos[0]);
-
-        msg = connection.receive(1000, TimeUnit.MILLISECONDS);
-        assertNotNull(msg);
-        assertEquals(ANONYMOUS, new String(msg.getPayload()));
-        msg.ack();
-
-        connection.disconnect();
-    }
-
-    @Test(timeout = 60 * 1000)
     public void testUniqueMessageIds() throws Exception {
-        addMQTTConnector();
-        brokerService.start();
-
         MQTT mqtt = createMQTTConnection();
         mqtt.setClientId("foo");
         mqtt.setKeepAlive((short) 2);
@@ -737,9 +655,6 @@ public class MQTTTest extends AbstractMQTTTest {
 
     @Test(timeout = 60 * 1000)
     public void testResendMessageId() throws Exception {
-        addMQTTConnector("trace=true");
-        brokerService.start();
-
         final MQTT mqtt = createMQTTConnection("resend", false);
         mqtt.setKeepAlive((short) 5);
 
@@ -806,9 +721,6 @@ public class MQTTTest extends AbstractMQTTTest {
 
     @Test(timeout = 90 * 1000)
     public void testPacketIdGeneratorNonCleanSession() throws Exception {
-        addMQTTConnector("trace=true");
-        brokerService.start();
-
         final MQTT mqtt = createMQTTConnection("nonclean-packetid", false);
         mqtt.setKeepAlive((short) 15);
 
@@ -882,9 +794,6 @@ public class MQTTTest extends AbstractMQTTTest {
 
     @Test(timeout = 90 * 1000)
     public void testPacketIdGeneratorCleanSession() throws Exception {
-        addMQTTConnector("trace=true");
-        brokerService.start();
-
         final String[] cleanClientIds = new String[] { "", "clean-packetid", null };
         final Map<Short, PUBLISH> publishMap = new ConcurrentHashMap<Short, PUBLISH>();
         MQTT[] mqtts = new MQTT[cleanClientIds.length];
@@ -944,9 +853,6 @@ public class MQTTTest extends AbstractMQTTTest {
 
     @Test(timeout = 60 * 1000)
     public void testClientConnectionFailure() throws Exception {
-        addMQTTConnector();
-        brokerService.start();
-
         MQTT mqtt = createMQTTConnection("reconnect", false);
         final BlockingConnection connection = mqtt.blockingConnection();
         connection.connect();
@@ -983,9 +889,6 @@ public class MQTTTest extends AbstractMQTTTest {
 
     @Test(timeout = 60 * 1000)
     public void testCleanSession() throws Exception {
-        addMQTTConnector();
-        brokerService.start();
-
         final String CLIENTID = "cleansession";
         final MQTT mqttNotClean = createMQTTConnection(CLIENTID, false);
         BlockingConnection notClean = mqttNotClean.blockingConnection();
@@ -1025,10 +928,6 @@ public class MQTTTest extends AbstractMQTTTest {
 
     @Test(timeout = 60 * 1000)
     public void testSendMQTTReceiveJMS() throws Exception {
-        addMQTTConnector();
-        TransportConnector openwireTransport = brokerService.addConnector("tcp://localhost:0");
-        brokerService.start();
-
         final MQTTClientProvider provider = getMQTTClientProvider();
         initializeConnection(provider);
         final String DESTINATION_NAME = "foo.*";
@@ -1037,7 +936,7 @@ public class MQTTTest extends AbstractMQTTTest {
         final String RETAINED = "RETAINED";
         provider.publish("foo/bah", RETAINED.getBytes(), AT_LEAST_ONCE, true);
 
-        ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory(openwireTransport.getConnectUri()).createConnection();
+        ActiveMQConnection activeMQConnection = (ActiveMQConnection) cf.createConnection();
         // MUST set to true to receive retained messages
         activeMQConnection.setUseRetroactiveConsumer(true);
         activeMQConnection.start();
@@ -1052,7 +951,7 @@ public class MQTTTest extends AbstractMQTTTest {
         assertEquals(RETAINED, new String(bs.data, bs.offset, bs.length));
         assertTrue(message.getBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAINED_PROPERTY));
 
-        for (int i = 0; i < numberOfMessages; i++) {
+        for (int i = 0; i < NUM_MESSAGES; i++) {
             String payload = "Test Message: " + i;
             provider.publish("foo/bah", payload.getBytes(), AT_LEAST_ONCE);
             message = (ActiveMQMessage) consumer.receive(5000);
@@ -1067,13 +966,10 @@ public class MQTTTest extends AbstractMQTTTest {
 
     @Test(timeout = 2 * 60 * 1000)
     public void testSendJMSReceiveMQTT() throws Exception {
-        addMQTTConnector();
-        TransportConnector openwireTransport = brokerService.addConnector("tcp://localhost:0");
-        brokerService.start();
         final MQTTClientProvider provider = getMQTTClientProvider();
         initializeConnection(provider);
 
-        ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory(openwireTransport.getConnectUri()).createConnection();
+        ActiveMQConnection activeMQConnection = (ActiveMQConnection) cf.createConnection();
         activeMQConnection.setUseRetroactiveConsumer(true);
         activeMQConnection.start();
         Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -1094,7 +990,7 @@ public class MQTTTest extends AbstractMQTTTest {
         assertNotNull("Should get retained message", message);
         assertEquals(RETAINED, new String(message));
 
-        for (int i = 0; i < numberOfMessages; i++) {
+        for (int i = 0; i < NUM_MESSAGES; i++) {
             String payload = "This is Test Message: " + i;
             sendMessage = s.createTextMessage(payload);
             producer.send(sendMessage);
@@ -1109,8 +1005,6 @@ public class MQTTTest extends AbstractMQTTTest {
 
     @Test(timeout = 60 * 1000)
     public void testPingKeepsInactivityMonitorAlive() throws Exception {
-        addMQTTConnector();
-        brokerService.start();
         MQTT mqtt = createMQTTConnection();
         mqtt.setClientId("foo");
         mqtt.setKeepAlive((short) 2);
@@ -1130,8 +1024,10 @@ public class MQTTTest extends AbstractMQTTTest {
 
     @Test(timeout = 60 * 1000)
     public void testTurnOffInactivityMonitor() throws Exception {
-        addMQTTConnector("transport.useInactivityMonitor=false");
-        brokerService.start();
+        stopBroker();
+        protocolConfig = "transport.useInactivityMonitor=false";
+        startBroker();
+
         MQTT mqtt = createMQTTConnection();
         mqtt.setClientId("foo3");
         mqtt.setKeepAlive((short) 2);
@@ -1151,13 +1047,8 @@ public class MQTTTest extends AbstractMQTTTest {
 
     @Test(timeout = 30 * 10000)
     public void testJmsMapping() throws Exception {
-        addMQTTConnector();
-        addOpenwireConnector();
-        brokerService.start();
-
         // start up jms consumer
-        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:" + openwireConnector.getConnectUri().getPort());
-        Connection jmsConn = factory.createConnection();
+        Connection jmsConn = cf.createConnection();
         Session session = jmsConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
         Destination dest = session.createTopic("test.foo");
         MessageConsumer consumer = session.createConsumer(dest);
@@ -1204,8 +1095,6 @@ public class MQTTTest extends AbstractMQTTTest {
             payload[i] = '2';
         }
 
-        addMQTTConnector();
-        brokerService.start();
         MQTT mqtt = createMQTTConnection();
         mqtt.setClientId("MQTT-Client");
         mqtt.setCleanSession(false);
@@ -1245,10 +1134,7 @@ public class MQTTTest extends AbstractMQTTTest {
         int numberOfRuns = 100;
         int messagesPerRun = 2;
 
-        addMQTTConnector("trace=true");
-        brokerService.start();
         final MQTT mqttPub = createMQTTConnection("MQTT-Pub-Client", true);
-
         final MQTT mqttSub = createMQTTConnection("MQTT-Sub-Client", false);
 
         final BlockingConnection connectionPub = mqttPub.blockingConnection();
@@ -1298,9 +1184,10 @@ public class MQTTTest extends AbstractMQTTTest {
 
     @Test(timeout = 30 * 1000)
     public void testDefaultKeepAliveWhenClientSpecifiesZero() throws Exception {
-        // default keep alive in milliseconds
-        addMQTTConnector("transport.defaultKeepAlive=2000");
-        brokerService.start();
+        stopBroker();
+        protocolConfig = "transport.defaultKeepAlive=2000";
+        startBroker();
+
         MQTT mqtt = createMQTTConnection();
         mqtt.setClientId("foo");
         mqtt.setKeepAlive((short) 0);
@@ -1318,9 +1205,6 @@ public class MQTTTest extends AbstractMQTTTest {
 
     @Test(timeout = 60 * 1000)
     public void testReuseConnection() throws Exception {
-        addMQTTConnector();
-        brokerService.start();
-
         MQTT mqtt = createMQTTConnection();
         mqtt.setClientId("Test-Client");
 
@@ -1340,9 +1224,6 @@ public class MQTTTest extends AbstractMQTTTest {
 
     @Test(timeout = 60 * 1000)
     public void testNoMessageReceivedAfterUnsubscribeMQTT() throws Exception {
-        addMQTTConnector();
-        brokerService.setPersistent(true);
-        brokerService.start();
         Topic[] topics = { new Topic("TopicA", QoS.EXACTLY_ONCE) };
 
         MQTT mqttPub = createMQTTConnection("MQTTPub-Client", true);
@@ -1395,8 +1276,6 @@ public class MQTTTest extends AbstractMQTTTest {
 
     @Test(timeout = 60 * 1000)
     public void testMQTT311Connection() throws Exception {
-        addMQTTConnector();
-        brokerService.start();
         MQTT mqtt = createMQTTConnection();
         mqtt.setClientId("foo");
         mqtt.setVersion("3.1.1");
@@ -1406,63 +1285,7 @@ public class MQTTTest extends AbstractMQTTTest {
     }
 
     @Test(timeout = 60 * 1000)
-    public void testWildcardRetainedSubscription() throws Exception {
-        addMQTTConnector();
-
-        LinkedList<AuthenticationUser> users = new LinkedList<AuthenticationUser>();
-        users.add(new AuthenticationUser("user", "user", "users"));
-        users.add(new AuthenticationUser("admin", "admin", "admins"));
-        final SimpleAuthenticationPlugin authenticationPlugin = new SimpleAuthenticationPlugin(users);
-
-
-        DefaultAuthorizationMap map = new DefaultAuthorizationMap();
-        LinkedList<DestinationMapEntry> authz = new LinkedList<DestinationMapEntry>();
-
-        AuthorizationEntry entryOne = new AuthorizationEntry();
-        entryOne.setDestination(new ActiveMQTopic("one"));
-        entryOne.setAdmin("admins");
-        entryOne.setRead("admins");
-        entryOne.setWrite("admins");
-        authz.add(entryOne);
-
-        AuthorizationEntry entryTwo = new AuthorizationEntry();
-        entryTwo.setDestination(new ActiveMQTopic("two"));
-        entryTwo.setAdmin("users");
-        entryTwo.setRead("users");
-        entryTwo.setWrite("users");
-        authz.add(entryTwo);
-
-        map.setAuthorizationEntries(authz);
-        AuthorizationPlugin authorizationPlugin = new AuthorizationPlugin(map);
-
-        brokerService.setPlugins(new BrokerPlugin[] { authorizationPlugin, authenticationPlugin });
-
-        brokerService.start();
-
-        MQTT mqttPub = createMQTTConnection("pub", true);
-        mqttPub.setUserName("admin");
-        mqttPub.setPassword("admin");
-
-        BlockingConnection connectionPub = mqttPub.blockingConnection();
-        connectionPub.connect();
-        connectionPub.publish("one", "test".getBytes(), QoS.AT_LEAST_ONCE, true);
-
-        MQTT mqttSub = createMQTTConnection("sub", true);
-        mqttSub.setUserName("user");
-        mqttSub.setPassword("user");
-        BlockingConnection connectionSub = mqttSub.blockingConnection();
-        connectionSub.connect();
-        connectionSub.subscribe(new Topic[]{new Topic("#", QoS.AT_LEAST_ONCE)});
-        Message msg = connectionSub.receive(1, TimeUnit.SECONDS);
-        assertNull("Shouldn't receive the message", msg);
-    }
-
-    @Test(timeout = 60 * 1000)
     public void testActiveMQRecoveryPolicy() throws Exception {
-        addMQTTConnector();
-
-        brokerService.start();
-
         // test with ActiveMQ LastImageSubscriptionRecoveryPolicy
         final PolicyMap policyMap = new PolicyMap();
         final PolicyEntry policyEntry = new PolicyEntry();
@@ -1507,49 +1330,25 @@ public class MQTTTest extends AbstractMQTTTest {
         assertEquals("Should receive 2 non-retained messages", 2, nonretain[0]);
     }
 
-    @Override
-    protected String getProtocolScheme() {
-        return "mqtt";
-    }
-
-    protected MQTTClientProvider getMQTTClientProvider() {
-        return new FuseMQQTTClientProvider();
-    }
-
-    protected MQTT createMQTTConnection() throws Exception {
-        return createMQTTConnection(null, false);
-    }
-
-    protected MQTT createMQTTConnection(String clientId, boolean clean) throws Exception {
-        MQTT mqtt = new MQTT();
-        mqtt.setConnectAttemptsMax(1);
-        mqtt.setReconnectAttemptsMax(0);
-        mqtt.setTracer(createTracer());
-        if (clientId != null) {
-            mqtt.setClientId(clientId);
-        }
-        mqtt.setCleanSession(clean);
-        mqtt.setHost("localhost", mqttConnector.getConnectUri().getPort());
-        // shut off connect retry
-        return mqtt;
-    }
+    @Test(timeout = 60 * 1000)
+    public void testPingOnMQTT() throws Exception {
+        stopBroker();
+        protocolConfig = "maxInactivityDuration=-1";
+        startBroker();
 
-    protected Tracer createTracer() {
-        return new Tracer() {
-            @Override
-            public void onReceive(MQTTFrame frame) {
-                LOG.info("Client Received:\n" + frame);
-            }
+        MQTT mqtt = createMQTTConnection();
+        mqtt.setClientId("test-mqtt");
+        mqtt.setKeepAlive((short)2);
+        final BlockingConnection connection = mqtt.blockingConnection();
+        connection.connect();
+        assertTrue("KeepAlive didn't work properly", Wait.waitFor(new Wait.Condition() {
 
             @Override
-            public void onSend(MQTTFrame frame) {
-                LOG.info("Client Sent:\n" + frame);
+            public boolean isSatisified() throws Exception {
+                return connection.isConnected();
             }
+        }));
 
-            @Override
-            public void debug(String message, Object... args) {
-                LOG.info(String.format(message, args));
-            }
-        };
+        connection.disconnect();
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/fb569e3f/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTestSupport.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTestSupport.java
index 7fad946..4571a36 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTestSupport.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTestSupport.java
@@ -20,14 +20,20 @@ package org.apache.activemq.transport.mqtt;
 import java.io.File;
 import java.io.IOException;
 import java.security.ProtectionDomain;
+import java.security.SecureRandom;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
 import java.util.ArrayList;
 import java.util.LinkedList;
-import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 import javax.jms.JMSException;
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerPlugin;
@@ -36,14 +42,6 @@ import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.broker.jmx.BrokerViewMBean;
 import org.apache.activemq.broker.jmx.QueueViewMBean;
 import org.apache.activemq.broker.jmx.TopicViewMBean;
-import org.apache.activemq.filter.DestinationMapEntry;
-import org.apache.activemq.security.AuthenticationUser;
-import org.apache.activemq.security.AuthorizationEntry;
-import org.apache.activemq.security.AuthorizationPlugin;
-import org.apache.activemq.security.DefaultAuthorizationMap;
-import org.apache.activemq.security.SimpleAuthenticationPlugin;
-import org.apache.activemq.security.TempDestinationAuthorizationEntry;
-import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
 import org.apache.activemq.transport.mqtt.util.ResourceLoadingSslContext;
 import org.fusesource.mqtt.client.MQTT;
 import org.fusesource.mqtt.client.Tracer;
@@ -52,6 +50,7 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.rules.TestName;
+import org.junit.runners.Parameterized.Parameter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,13 +60,17 @@ public class MQTTTestSupport {
 
     protected BrokerService brokerService;
     protected int port;
-    protected int sslPort;
-    protected int nioPort;
-    protected int nioSslPort;
     protected String jmsUri = "vm://localhost";
     protected ActiveMQConnectionFactory cf;
     protected LinkedList<Throwable> exceptions = new LinkedList<Throwable>();
-    protected int numberOfMessages;
+    protected boolean persistent;
+    protected String protocolConfig;
+
+    @Parameter(0)
+    public String protocolScheme;
+
+    @Parameter(1)
+    public boolean useSSL;
 
     public static final int AT_MOST_ONCE = 0;
     public static final int AT_LEAST_ONCE = 1;
@@ -80,18 +83,14 @@ public class MQTTTestSupport {
         return new File(new File(protectionDomain.getCodeSource().getLocation().getPath()), "../..").getCanonicalFile();
     }
 
-    public static void main(String[] args) throws Exception {
-        final MQTTTestSupport s = new MQTTTestSupport();
-
-        s.sslPort = 5675;
-        s.port = 5676;
-        s.nioPort = 5677;
-        s.nioSslPort = 5678;
+    public MQTTTestSupport() {
+        this.protocolScheme = "mqtt";
+        this.useSSL = false;
+    }
 
-        s.startBroker();
-        while(true) {
-            Thread.sleep(100000);
-        }
+    public MQTTTestSupport(String connectorScheme, boolean useSsl) {
+        this.protocolScheme = connectorScheme;
+        this.useSSL = useSsl;
     }
 
     public String getName() {
@@ -100,8 +99,16 @@ public class MQTTTestSupport {
 
     @Before
     public void setUp() throws Exception {
+
+        String basedir = basedir().getPath();
+        System.setProperty("javax.net.ssl.trustStore", basedir + "/src/test/resources/client.keystore");
+        System.setProperty("javax.net.ssl.trustStorePassword", "password");
+        System.setProperty("javax.net.ssl.trustStoreType", "jks");
+        System.setProperty("javax.net.ssl.keyStore", basedir + "/src/test/resources/server.keystore");
+        System.setProperty("javax.net.ssl.keyStorePassword", "password");
+        System.setProperty("javax.net.ssl.keyStoreType", "jks");
+
         exceptions.clear();
-        numberOfMessages = 1000;
         startBroker();
     }
 
@@ -162,84 +169,16 @@ public class MQTTTestSupport {
         brokerService = new BrokerService();
         brokerService.setPersistent(isPersistent());
         brokerService.setAdvisorySupport(false);
-        brokerService.setSchedulerSupport(true);
+        brokerService.setSchedulerSupport(isSchedulerSupportEnabled());
         brokerService.setPopulateJMSXUserID(true);
-        brokerService.setSchedulerSupport(true);
-
-        JobSchedulerStoreImpl jobStore = new JobSchedulerStoreImpl();
-        jobStore.setDirectory(new File("activemq-data"));
-
-        brokerService.setJobSchedulerStore(jobStore);
     }
 
     protected BrokerPlugin configureAuthentication() throws Exception {
-        List<AuthenticationUser> users = new ArrayList<AuthenticationUser>();
-        users.add(new AuthenticationUser("system", "manager", "users,admins"));
-        users.add(new AuthenticationUser("user", "password", "users"));
-        users.add(new AuthenticationUser("guest", "password", "guests"));
-        SimpleAuthenticationPlugin authenticationPlugin = new SimpleAuthenticationPlugin(users);
-
-        return authenticationPlugin;
+        return null;
     }
 
     protected BrokerPlugin configureAuthorization() throws Exception {
-
-        @SuppressWarnings("rawtypes")
-        List<DestinationMapEntry> authorizationEntries = new ArrayList<DestinationMapEntry>();
-
-        AuthorizationEntry entry = new AuthorizationEntry();
-        entry.setQueue(">");
-        entry.setRead("admins");
-        entry.setWrite("admins");
-        entry.setAdmin("admins");
-        authorizationEntries.add(entry);
-        entry = new AuthorizationEntry();
-        entry.setQueue("USERS.>");
-        entry.setRead("users");
-        entry.setWrite("users");
-        entry.setAdmin("users");
-        authorizationEntries.add(entry);
-        entry = new AuthorizationEntry();
-        entry.setQueue("GUEST.>");
-        entry.setRead("guests");
-        entry.setWrite("guests,users");
-        entry.setAdmin("guests,users");
-        authorizationEntries.add(entry);
-        entry = new AuthorizationEntry();
-        entry.setTopic(">");
-        entry.setRead("admins");
-        entry.setWrite("admins");
-        entry.setAdmin("admins");
-        authorizationEntries.add(entry);
-        entry = new AuthorizationEntry();
-        entry.setTopic("USERS.>");
-        entry.setRead("users");
-        entry.setWrite("users");
-        entry.setAdmin("users");
-        authorizationEntries.add(entry);
-        entry = new AuthorizationEntry();
-        entry.setTopic("GUEST.>");
-        entry.setRead("guests");
-        entry.setWrite("guests,users");
-        entry.setAdmin("guests,users");
-        authorizationEntries.add(entry);
-        entry = new AuthorizationEntry();
-        entry.setTopic("ActiveMQ.Advisory.>");
-        entry.setRead("guests,users");
-        entry.setWrite("guests,users");
-        entry.setAdmin("guests,users");
-        authorizationEntries.add(entry);
-
-        TempDestinationAuthorizationEntry tempEntry = new TempDestinationAuthorizationEntry();
-        tempEntry.setRead("admins");
-        tempEntry.setWrite("admins");
-        tempEntry.setAdmin("admins");
-
-        DefaultAuthorizationMap authorizationMap = new DefaultAuthorizationMap(authorizationEntries);
-        authorizationMap.setTempDestinationAuthorizationEntry(tempEntry);
-        AuthorizationPlugin authorizationPlugin = new AuthorizationPlugin(authorizationMap);
-
-        return authorizationPlugin;
+        return null;
     }
 
     protected void applyBrokerPolicies() throws Exception {
@@ -255,8 +194,16 @@ public class MQTTTestSupport {
         // Overrides of this method can add additional configuration options or add multiple
         // MQTT transport connectors as needed, the port variable is always supposed to be
         // assigned the primary MQTT connector's port.
-        TransportConnector connector = brokerService.addConnector(getProtocolScheme() + "://0.0.0.0:" + port);
-        port = connector.getConnectUri().getPort();
+
+        StringBuilder connectorURI = new StringBuilder();
+        connectorURI.append(getProtocolScheme());
+        connectorURI.append("://0.0.0.0:").append(port);
+        if (protocolConfig != null && !protocolConfig.isEmpty()) {
+            connectorURI.append("?").append(protocolConfig);
+        }
+
+        port = brokerService.addConnector(connectorURI.toString()).getConnectUri().getPort();
+        LOG.info("Added connector {} to broker", getProtocolScheme());
     }
 
     public void stopBroker() throws Exception {
@@ -299,7 +246,7 @@ public class MQTTTestSupport {
 
     /**
      * Initialize an MQTTClientProvider instance.  By default this method uses the port that's
-     * assigned to be the TCP based port using the base version of addMQTTConnector.  A sbuclass
+     * assigned to be the TCP based port using the base version of addMQTTConnector.  A subclass
      * can either change the value of port or override this method to assign the correct port.
      *
      * @param provider
@@ -308,14 +255,41 @@ public class MQTTTestSupport {
      * @throws Exception if an error occurs during initialization.
      */
     protected void initializeConnection(MQTTClientProvider provider) throws Exception {
-        provider.connect("tcp://localhost:" + port);
+        if (!isUseSSL()) {
+            provider.connect("tcp://localhost:" + port);
+        } else {
+            SSLContext ctx = SSLContext.getInstance("TLS");
+            ctx.init(new KeyManager[0], new TrustManager[] { new DefaultTrustManager() }, new SecureRandom());
+            provider.setSslContext(ctx);
+            provider.connect("ssl://localhost:" + port);
+        }
+    }
+
+    public String getProtocolScheme() {
+        return protocolScheme;
+    }
+
+    public void setProtocolScheme(String scheme) {
+        this.protocolScheme = scheme;
+    }
+
+    public boolean isUseSSL() {
+        return this.useSSL;
+    }
+
+    public void setUseSSL(boolean useSSL) {
+        this.useSSL = useSSL;
+    }
+
+    public boolean isPersistent() {
+        return persistent;
     }
 
-    protected String getProtocolScheme() {
-        return "mqtt";
+    public int getPort() {
+        return this.port;
     }
 
-    protected boolean isPersistent() {
+    public boolean isSchedulerSupportEnabled() {
         return false;
     }
 
@@ -355,6 +329,14 @@ public class MQTTTestSupport {
     }
 
     protected MQTT createMQTTConnection(String clientId, boolean clean) throws Exception {
+        if (isUseSSL()) {
+            return createMQTTSslConnection(clientId, clean);
+        } else {
+            return createMQTTTcpConnection(clientId, clean);
+        }
+    }
+
+    private MQTT createMQTTTcpConnection(String clientId, boolean clean) throws Exception {
         MQTT mqtt = new MQTT();
         mqtt.setConnectAttemptsMax(1);
         mqtt.setReconnectAttemptsMax(0);
@@ -367,6 +349,23 @@ public class MQTTTestSupport {
         return mqtt;
     }
 
+    private MQTT createMQTTSslConnection(String clientId, boolean clean) throws Exception {
+        MQTT mqtt = new MQTT();
+        mqtt.setConnectAttemptsMax(1);
+        mqtt.setReconnectAttemptsMax(0);
+        mqtt.setTracer(createTracer());
+        mqtt.setHost("ssl://localhost:" + port);
+        if (clientId != null) {
+            mqtt.setClientId(clientId);
+        }
+        mqtt.setCleanSession(clean);
+
+        SSLContext ctx = SSLContext.getInstance("TLS");
+        ctx.init(new KeyManager[0], new TrustManager[] { new DefaultTrustManager() }, new SecureRandom());
+        mqtt.setSslContext(ctx);
+        return mqtt;
+    }
+
     protected Tracer createTracer() {
         return new Tracer() {
             @Override
@@ -385,4 +384,20 @@ public class MQTTTestSupport {
             }
         };
     }
+
+    static class DefaultTrustManager implements X509TrustManager {
+
+        @Override
+        public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
+        }
+
+        @Override
+        public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
+        }
+
+        @Override
+        public X509Certificate[] getAcceptedIssuers() {
+            return new X509Certificate[0];
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/fb569e3f/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTests.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTests.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTests.java
deleted file mode 100644
index ba2e964..0000000
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTests.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.mqtt;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.net.ProtocolException;
-
-import org.fusesource.mqtt.client.BlockingConnection;
-import org.fusesource.mqtt.client.MQTT;
-import org.fusesource.mqtt.client.Tracer;
-import org.fusesource.mqtt.codec.CONNACK;
-import org.fusesource.mqtt.codec.MQTTFrame;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MQTTTests extends MQTTTestSupport {
-
-    private static final Logger LOG = LoggerFactory.getLogger(MQTTTests.class);
-
-    @Test(timeout = 60 * 1000)
-    public void testBadUserNameOrPasswordGetsConnAckWithErrorCode() throws Exception {
-        MQTT mqttPub = createMQTTConnection("pub", true);
-        mqttPub.setUserName("admin");
-        mqttPub.setPassword("admin");
-
-        mqttPub.setTracer(new Tracer() {
-            @Override
-            public void onReceive(MQTTFrame frame) {
-                LOG.info("Client received: {}", frame);
-                if (frame.messageType() == CONNACK.TYPE) {
-                    CONNACK connAck = new CONNACK();
-                    try {
-                        connAck.decode(frame);
-                        LOG.info("{}", connAck);
-                        assertEquals(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD, connAck.code());
-                    } catch (ProtocolException e) {
-                        fail("Error decoding publish " + e.getMessage());
-                    }
-                }
-            }
-
-            @Override
-            public void onSend(MQTTFrame frame) {
-                LOG.info("Client sent: {}", frame);
-            }
-        });
-
-        BlockingConnection connectionPub = mqttPub.blockingConnection();
-        try {
-            connectionPub.connect();
-            fail("Should not be able to connect.");
-        } catch (Exception e) {}
-    }
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/fb569e3f/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTNioTTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTNioTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTNioTTest.java
deleted file mode 100644
index 589f3bb..0000000
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTNioTTest.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.mqtt;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.TransportConnector;
-import org.eclipse.paho.client.mqttv3.MqttClient;
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class PahoMQTNioTTest extends PahoMQTTTest {
-
-    private static final Logger LOG = LoggerFactory.getLogger(PahoMQTNioTTest.class);
-
-    @Override
-    protected String getProtocolScheme() {
-        return "mqtt+nio";
-    }
-
-    @Test(timeout = 300000)
-    public void testLotsOfClients() throws Exception {
-
-        final int CLIENTS = Integer.getInteger("PahoMQTNioTTest.CLIENTS", 100);
-        LOG.info("Using: " + CLIENTS + " clients");
-        addMQTTConnector();
-        TransportConnector openwireTransport = brokerService.addConnector("tcp://localhost:0");
-        brokerService.start();
-
-        ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory(openwireTransport.getConnectUri()).createConnection();
-        activeMQConnection.start();
-        Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumer = s.createConsumer(s.createTopic("test"));
-
-        final AtomicInteger receiveCounter = new AtomicInteger();
-        consumer.setMessageListener(new MessageListener() {
-            @Override
-            public void onMessage(Message message) {
-                receiveCounter.incrementAndGet();
-            }
-        });
-
-        final AtomicReference<Throwable> asyncError = new AtomicReference<Throwable>();
-        final CountDownLatch connectedDoneLatch = new CountDownLatch(CLIENTS);
-        final CountDownLatch disconnectDoneLatch = new CountDownLatch(CLIENTS);
-        final CountDownLatch sendBarrier = new CountDownLatch(1);
-        for (int i = 0; i < CLIENTS; i++) {
-            Thread.sleep(10);
-            new Thread(null, null, "client:" + i) {
-                @Override
-                public void run() {
-                    try {
-                        MqttClient client = new MqttClient("tcp://localhost:" + mqttConnector.getConnectUri().getPort(), Thread.currentThread().getName(),
-                            new MemoryPersistence());
-                        client.connect();
-                        connectedDoneLatch.countDown();
-                        sendBarrier.await();
-                        for (int i = 0; i < 10; i++) {
-                            Thread.sleep(1000);
-                            client.publish("test", "hello".getBytes(), 1, false);
-                        }
-                        client.disconnect();
-                        client.close();
-                    } catch (Throwable e) {
-                        e.printStackTrace();
-                        asyncError.set(e);
-                    } finally {
-                        disconnectDoneLatch.countDown();
-                    }
-                }
-            }.start();
-        }
-
-        connectedDoneLatch.await();
-        assertNull("Async error: " + asyncError.get(), asyncError.get());
-        sendBarrier.countDown();
-
-        LOG.info("All clients connected... waiting to receive sent messages...");
-
-        // We should eventually get all the messages.
-        within(30, TimeUnit.SECONDS, new Task() {
-            @Override
-            public void run() throws Exception {
-                assertTrue(receiveCounter.get() == CLIENTS * 10);
-            }
-        });
-
-        LOG.info("All messages received.");
-
-        disconnectDoneLatch.await();
-        assertNull("Async error: " + asyncError.get(), asyncError.get());
-    }
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/fb569e3f/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java
index 3af4b53..17305be 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java
@@ -16,31 +16,125 @@
  */
 package org.apache.activemq.transport.mqtt;
 
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
 import javax.jms.Session;
 
 import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.TransportConnector;
 import org.eclipse.paho.client.mqttv3.MqttClient;
 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(Parameterized.class)
+public class PahoMQTTTest extends MQTTTestSupport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(PahoMQTTTest.class);
+
+    @Parameters(name= "{index}: scheme({0})")
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] {
+                {"mqtt", false},
+                {"mqtt+nio", false}
+            });
+    }
+
+    @Test(timeout = 300000)
+    public void testLotsOfClients() throws Exception {
+
+        final int CLIENTS = Integer.getInteger("PahoMQTTTest.CLIENTS", 100);
+        LOG.info("Using: {} clients", CLIENTS);
 
-public class PahoMQTTTest extends AbstractMQTTTest {
+        ActiveMQConnection activeMQConnection = (ActiveMQConnection) cf.createConnection();
+        activeMQConnection.start();
+        Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = s.createConsumer(s.createTopic("test"));
+
+        final AtomicInteger receiveCounter = new AtomicInteger();
+        consumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+                receiveCounter.incrementAndGet();
+            }
+        });
+
+        final AtomicReference<Throwable> asyncError = new AtomicReference<Throwable>();
+        final CountDownLatch connectedDoneLatch = new CountDownLatch(CLIENTS);
+        final CountDownLatch disconnectDoneLatch = new CountDownLatch(CLIENTS);
+        final CountDownLatch sendBarrier = new CountDownLatch(1);
+        for (int i = 0; i < CLIENTS; i++) {
+            Thread.sleep(10);
+            new Thread(null, null, "client:" + i) {
+                @Override
+                public void run() {
+                    try {
+                        MqttClient client = new MqttClient("tcp://localhost:" + getPort(),
+                                                           Thread.currentThread().getName(),
+                                                           new MemoryPersistence());
+                        client.connect();
+                        connectedDoneLatch.countDown();
+                        sendBarrier.await();
+                        for (int i = 0; i < 10; i++) {
+                            Thread.sleep(1000);
+                            client.publish("test", "hello".getBytes(), 1, false);
+                        }
+                        client.disconnect();
+                        client.close();
+                    } catch (Throwable e) {
+                        e.printStackTrace();
+                        asyncError.set(e);
+                    } finally {
+                        disconnectDoneLatch.countDown();
+                    }
+                }
+            }.start();
+        }
+
+        connectedDoneLatch.await();
+        assertNull("Async error: " + asyncError.get(), asyncError.get());
+        sendBarrier.countDown();
+
+        LOG.info("All clients connected... waiting to receive sent messages...");
+
+        // We should eventually get all the messages.
+        within(30, TimeUnit.SECONDS, new Task() {
+            @Override
+            public void run() throws Exception {
+                assertTrue(receiveCounter.get() == CLIENTS * 10);
+            }
+        });
+
+        LOG.info("All messages received.");
+
+        disconnectDoneLatch.await();
+        assertNull("Async error: " + asyncError.get(), asyncError.get());
+    }
 
     @Test(timeout=300000)
     public void testSendAndReceiveMQTT() throws Exception {
-        addMQTTConnector();
-        TransportConnector openwireTransport = brokerService.addConnector("tcp://localhost:0");
-        brokerService.start();
 
-        ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory(openwireTransport.getConnectUri()).createConnection();
+        ActiveMQConnection activeMQConnection = (ActiveMQConnection) cf.createConnection();
         activeMQConnection.start();
         Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         MessageConsumer consumer = s.createConsumer(s.createTopic("test"));
 
-        MqttClient client = new MqttClient("tcp://localhost:" + mqttConnector.getConnectUri().getPort(), "clientid", new MemoryPersistence());
+        MqttClient client = new MqttClient("tcp://localhost:" + getPort(), "clientid", new MemoryPersistence());
         client.connect();
         client.publish("test", "hello".getBytes(), 1, false);
 


Mime
View raw message