activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject git commit: https://issues.apache.org/jira/browse/AMQ-5233
Date Mon, 21 Jul 2014 19:55:29 GMT
Repository: activemq
Updated Branches:
  refs/heads/trunk 0ebbd5d97 -> 3653f81b5


https://issues.apache.org/jira/browse/AMQ-5233

Return the right code when a client connects with bad credentials.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/3653f81b
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/3653f81b
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/3653f81b

Branch: refs/heads/trunk
Commit: 3653f81b5b11217217afb4a372abbf6ed804e05e
Parents: 0ebbd5d
Author: Timothy Bish <tabish121@gmail.com>
Authored: Mon Jul 21 15:55:06 2014 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Mon Jul 21 15:55:06 2014 -0400

----------------------------------------------------------------------
 .../transport/mqtt/MQTTProtocolConverter.java   |  49 ++-
 .../transport/mqtt/MQTTTestSupport.java         | 388 +++++++++++++++++++
 .../activemq/transport/mqtt/MQTTTests.java      |  71 ++++
 .../mqtt/util/ResourceLoadingSslContext.java    | 237 +++++++++++
 4 files changed, 742 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/3653f81b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
index 56f7fbd..7f39be0 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
@@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.zip.DataFormatException;
 import java.util.zip.Inflater;
+
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -37,7 +38,31 @@ import org.apache.activemq.broker.region.RegionBroker;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.TopicRegion;
 import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy;
-import org.apache.activemq.command.*;
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMapMessage;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ConnectionError;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.ExceptionResponse;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.RemoveInfo;
+import org.apache.activemq.command.RemoveSubscriptionInfo;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.command.SessionId;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.command.ShutdownInfo;
+import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.store.PersistenceAdapterSupport;
 import org.apache.activemq.util.ByteArrayOutputStream;
 import org.apache.activemq.util.ByteSequence;
@@ -49,7 +74,21 @@ import org.fusesource.hawtbuf.Buffer;
 import org.fusesource.hawtbuf.UTF8Buffer;
 import org.fusesource.mqtt.client.QoS;
 import org.fusesource.mqtt.client.Topic;
-import org.fusesource.mqtt.codec.*;
+import org.fusesource.mqtt.codec.CONNACK;
+import org.fusesource.mqtt.codec.CONNECT;
+import org.fusesource.mqtt.codec.DISCONNECT;
+import org.fusesource.mqtt.codec.MQTTFrame;
+import org.fusesource.mqtt.codec.PINGREQ;
+import org.fusesource.mqtt.codec.PINGRESP;
+import org.fusesource.mqtt.codec.PUBACK;
+import org.fusesource.mqtt.codec.PUBCOMP;
+import org.fusesource.mqtt.codec.PUBLISH;
+import org.fusesource.mqtt.codec.PUBREC;
+import org.fusesource.mqtt.codec.PUBREL;
+import org.fusesource.mqtt.codec.SUBACK;
+import org.fusesource.mqtt.codec.SUBSCRIBE;
+import org.fusesource.mqtt.codec.UNSUBACK;
+import org.fusesource.mqtt.codec.UNSUBSCRIBE;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -253,7 +292,11 @@ public class MQTTProtocolConverter {
                     Throwable exception = ((ExceptionResponse) response).getException();
                     //let the client know
                     CONNACK ack = new CONNACK();
-                    ack.code(CONNACK.Code.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
+                    if (exception instanceof SecurityException) {
+                        ack.code(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD);
+                    } else {
+                        ack.code(CONNACK.Code.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
+                    }
                     getMQTTTransport().sendToMQTT(ack.encode());
                     getMQTTTransport().onException(IOExceptionSupport.create(exception));
                     return;

http://git-wip-us.apache.org/repos/asf/activemq/blob/3653f81b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTestSupport.java
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTestSupport.java
new file mode 100644
index 0000000..7fad946
--- /dev/null
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTestSupport.java
@@ -0,0 +1,388 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.transport.mqtt;
+
+import java.io.File;
+import java.io.IOException;
+import java.security.ProtectionDomain;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.JMSException;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.jmx.BrokerViewMBean;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.jmx.TopicViewMBean;
+import org.apache.activemq.filter.DestinationMapEntry;
+import org.apache.activemq.security.AuthenticationUser;
+import org.apache.activemq.security.AuthorizationEntry;
+import org.apache.activemq.security.AuthorizationPlugin;
+import org.apache.activemq.security.DefaultAuthorizationMap;
+import org.apache.activemq.security.SimpleAuthenticationPlugin;
+import org.apache.activemq.security.TempDestinationAuthorizationEntry;
+import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
+import org.apache.activemq.transport.mqtt.util.ResourceLoadingSslContext;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.Tracer;
+import org.fusesource.mqtt.codec.MQTTFrame;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MQTTTestSupport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MQTTTestSupport.class);
+
+    protected BrokerService brokerService;
+    protected int port;
+    protected int sslPort;
+    protected int nioPort;
+    protected int nioSslPort;
+    protected String jmsUri = "vm://localhost";
+    protected ActiveMQConnectionFactory cf;
+    protected LinkedList<Throwable> exceptions = new LinkedList<Throwable>();
+    protected int numberOfMessages;
+
+    public static final int AT_MOST_ONCE = 0;
+    public static final int AT_LEAST_ONCE = 1;
+    public static final int EXACTLY_ONCE = 2;
+
+    @Rule public TestName name = new TestName();
+
+    public File basedir() throws IOException {
+        ProtectionDomain protectionDomain = getClass().getProtectionDomain();
+        return new File(new File(protectionDomain.getCodeSource().getLocation().getPath()),
"../..").getCanonicalFile();
+    }
+
+    public static void main(String[] args) throws Exception {
+        final MQTTTestSupport s = new MQTTTestSupport();
+
+        s.sslPort = 5675;
+        s.port = 5676;
+        s.nioPort = 5677;
+        s.nioSslPort = 5678;
+
+        s.startBroker();
+        while(true) {
+            Thread.sleep(100000);
+        }
+    }
+
+    public String getName() {
+        return name.getMethodName();
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        exceptions.clear();
+        numberOfMessages = 1000;
+        startBroker();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        stopBroker();
+    }
+
+    public void startBroker() throws Exception {
+
+        createBroker();
+
+        applyBrokerPolicies();
+        applyMemoryLimitPolicy();
+
+        // Setup SSL context...
+        File keyStore = new File(basedir(), "src/test/resources/server.keystore");
+        File trustStore = new File(basedir(), "src/test/resources/client.keystore");
+
+        final ResourceLoadingSslContext sslContext = new ResourceLoadingSslContext();
+        sslContext.setKeyStore(keyStore.getCanonicalPath());
+        sslContext.setKeyStorePassword("password");
+        sslContext.setTrustStore(trustStore.getCanonicalPath());
+        sslContext.setTrustStorePassword("password");
+        sslContext.afterPropertiesSet();
+        brokerService.setSslContext(sslContext);
+
+        ArrayList<BrokerPlugin> plugins = new ArrayList<BrokerPlugin>();
+
+        addMQTTConnector();
+        addOpenWireConnector();
+
+        cf = new ActiveMQConnectionFactory(jmsUri);
+
+        BrokerPlugin authenticationPlugin = configureAuthentication();
+        if (authenticationPlugin != null) {
+            plugins.add(configureAuthorization());
+        }
+
+        BrokerPlugin authorizationPlugin = configureAuthorization();
+        if (authorizationPlugin != null) {
+            plugins.add(configureAuthentication());
+        }
+
+        if (!plugins.isEmpty()) {
+            BrokerPlugin[] array = new BrokerPlugin[plugins.size()];
+            brokerService.setPlugins(plugins.toArray(array));
+        }
+
+        brokerService.start();
+        brokerService.waitUntilStarted();
+    }
+
+    protected void applyMemoryLimitPolicy() throws Exception {
+    }
+
+    protected void createBroker() throws Exception {
+        brokerService = new BrokerService();
+        brokerService.setPersistent(isPersistent());
+        brokerService.setAdvisorySupport(false);
+        brokerService.setSchedulerSupport(true);
+        brokerService.setPopulateJMSXUserID(true);
+        brokerService.setSchedulerSupport(true);
+
+        JobSchedulerStoreImpl jobStore = new JobSchedulerStoreImpl();
+        jobStore.setDirectory(new File("activemq-data"));
+
+        brokerService.setJobSchedulerStore(jobStore);
+    }
+
+    protected BrokerPlugin configureAuthentication() throws Exception {
+        List<AuthenticationUser> users = new ArrayList<AuthenticationUser>();
+        users.add(new AuthenticationUser("system", "manager", "users,admins"));
+        users.add(new AuthenticationUser("user", "password", "users"));
+        users.add(new AuthenticationUser("guest", "password", "guests"));
+        SimpleAuthenticationPlugin authenticationPlugin = new SimpleAuthenticationPlugin(users);
+
+        return authenticationPlugin;
+    }
+
+    protected BrokerPlugin configureAuthorization() throws Exception {
+
+        @SuppressWarnings("rawtypes")
+        List<DestinationMapEntry> authorizationEntries = new ArrayList<DestinationMapEntry>();
+
+        AuthorizationEntry entry = new AuthorizationEntry();
+        entry.setQueue(">");
+        entry.setRead("admins");
+        entry.setWrite("admins");
+        entry.setAdmin("admins");
+        authorizationEntries.add(entry);
+        entry = new AuthorizationEntry();
+        entry.setQueue("USERS.>");
+        entry.setRead("users");
+        entry.setWrite("users");
+        entry.setAdmin("users");
+        authorizationEntries.add(entry);
+        entry = new AuthorizationEntry();
+        entry.setQueue("GUEST.>");
+        entry.setRead("guests");
+        entry.setWrite("guests,users");
+        entry.setAdmin("guests,users");
+        authorizationEntries.add(entry);
+        entry = new AuthorizationEntry();
+        entry.setTopic(">");
+        entry.setRead("admins");
+        entry.setWrite("admins");
+        entry.setAdmin("admins");
+        authorizationEntries.add(entry);
+        entry = new AuthorizationEntry();
+        entry.setTopic("USERS.>");
+        entry.setRead("users");
+        entry.setWrite("users");
+        entry.setAdmin("users");
+        authorizationEntries.add(entry);
+        entry = new AuthorizationEntry();
+        entry.setTopic("GUEST.>");
+        entry.setRead("guests");
+        entry.setWrite("guests,users");
+        entry.setAdmin("guests,users");
+        authorizationEntries.add(entry);
+        entry = new AuthorizationEntry();
+        entry.setTopic("ActiveMQ.Advisory.>");
+        entry.setRead("guests,users");
+        entry.setWrite("guests,users");
+        entry.setAdmin("guests,users");
+        authorizationEntries.add(entry);
+
+        TempDestinationAuthorizationEntry tempEntry = new TempDestinationAuthorizationEntry();
+        tempEntry.setRead("admins");
+        tempEntry.setWrite("admins");
+        tempEntry.setAdmin("admins");
+
+        DefaultAuthorizationMap authorizationMap = new DefaultAuthorizationMap(authorizationEntries);
+        authorizationMap.setTempDestinationAuthorizationEntry(tempEntry);
+        AuthorizationPlugin authorizationPlugin = new AuthorizationPlugin(authorizationMap);
+
+        return authorizationPlugin;
+    }
+
+    protected void applyBrokerPolicies() throws Exception {
+        // NOOP here
+    }
+
+    protected void addOpenWireConnector() throws Exception {
+        TransportConnector connector = brokerService.addConnector("tcp://0.0.0.0:0");
+        jmsUri = connector.getPublishableConnectString();
+    }
+
+    protected void addMQTTConnector() throws Exception {
+        // Overrides of this method can add additional configuration options or add multiple
+        // MQTT transport connectors as needed, the port variable is always supposed to be
+        // assigned the primary MQTT connector's port.
+        TransportConnector connector = brokerService.addConnector(getProtocolScheme() + "://0.0.0.0:"
+ port);
+        port = connector.getConnectUri().getPort();
+    }
+
+    public void stopBroker() throws Exception {
+        if (brokerService != null) {
+            brokerService.stop();
+            brokerService.waitUntilStopped();
+            brokerService = null;
+        }
+    }
+
+    protected String getQueueName() {
+        return getClass().getName() + "." + name.getMethodName();
+    }
+
+    protected String getTopicName() {
+        return getClass().getName() + "." + name.getMethodName();
+    }
+
+    protected BrokerViewMBean getProxyToBroker() throws MalformedObjectNameException, JMSException
{
+        ObjectName brokerViewMBean = new ObjectName(
+            "org.apache.activemq:type=Broker,brokerName=localhost");
+        BrokerViewMBean proxy = (BrokerViewMBean) brokerService.getManagementContext()
+                .newProxyInstance(brokerViewMBean, BrokerViewMBean.class, true);
+        return proxy;
+    }
+
+    protected QueueViewMBean getProxyToQueue(String name) throws MalformedObjectNameException,
JMSException {
+        ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName="+name);
+        QueueViewMBean proxy = (QueueViewMBean) brokerService.getManagementContext()
+                .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
+        return proxy;
+    }
+
+    protected TopicViewMBean getProxyToTopic(String name) throws MalformedObjectNameException,
JMSException {
+        ObjectName topicViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Topic,destinationName="+name);
+        TopicViewMBean proxy = (TopicViewMBean) brokerService.getManagementContext()
+                .newProxyInstance(topicViewMBeanName, TopicViewMBean.class, true);
+        return proxy;
+    }
+
+    /**
+     * Initialize an MQTTClientProvider instance.  By default this method uses the port that's
+     * assigned to be the TCP based port using the base version of addMQTTConnector.  A sbuclass
+     * can either change the value of port or override this method to assign the correct
port.
+     *
+     * @param provider
+     *        the MQTTClientProvider instance to initialize.
+     *
+     * @throws Exception if an error occurs during initialization.
+     */
+    protected void initializeConnection(MQTTClientProvider provider) throws Exception {
+        provider.connect("tcp://localhost:" + port);
+    }
+
+    protected String getProtocolScheme() {
+        return "mqtt";
+    }
+
+    protected boolean isPersistent() {
+        return false;
+    }
+
+    protected static interface Task {
+        public void run() throws Exception;
+    }
+
+    protected  void within(int time, TimeUnit unit, Task task) throws InterruptedException
{
+        long timeMS = unit.toMillis(time);
+        long deadline = System.currentTimeMillis() + timeMS;
+        while (true) {
+            try {
+                task.run();
+                return;
+            } catch (Throwable e) {
+                long remaining = deadline - System.currentTimeMillis();
+                if( remaining <=0 ) {
+                    if( e instanceof RuntimeException ) {
+                        throw (RuntimeException)e;
+                    }
+                    if( e instanceof Error ) {
+                        throw (Error)e;
+                    }
+                    throw new RuntimeException(e);
+                }
+                Thread.sleep(Math.min(timeMS/10, remaining));
+            }
+        }
+    }
+
+    protected MQTTClientProvider getMQTTClientProvider() {
+        return new FuseMQQTTClientProvider();
+    }
+
+    protected MQTT createMQTTConnection() throws Exception {
+        return createMQTTConnection(null, false);
+    }
+
+    protected MQTT createMQTTConnection(String clientId, boolean clean) throws Exception
{
+        MQTT mqtt = new MQTT();
+        mqtt.setConnectAttemptsMax(1);
+        mqtt.setReconnectAttemptsMax(0);
+        mqtt.setTracer(createTracer());
+        if (clientId != null) {
+            mqtt.setClientId(clientId);
+        }
+        mqtt.setCleanSession(clean);
+        mqtt.setHost("localhost", port);
+        return mqtt;
+    }
+
+    protected Tracer createTracer() {
+        return new Tracer() {
+            @Override
+            public void onReceive(MQTTFrame frame) {
+                LOG.info("Client Received:\n" + frame);
+            }
+
+            @Override
+            public void onSend(MQTTFrame frame) {
+                LOG.info("Client Sent:\n" + frame);
+            }
+
+            @Override
+            public void debug(String message, Object... args) {
+                LOG.info(String.format(message, args));
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/3653f81b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTests.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTests.java
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTests.java
new file mode 100644
index 0000000..ba2e964
--- /dev/null
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTests.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.mqtt;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.net.ProtocolException;
+
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.Tracer;
+import org.fusesource.mqtt.codec.CONNACK;
+import org.fusesource.mqtt.codec.MQTTFrame;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MQTTTests extends MQTTTestSupport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MQTTTests.class);
+
+    @Test(timeout = 60 * 1000)
+    public void testBadUserNameOrPasswordGetsConnAckWithErrorCode() throws Exception {
+        MQTT mqttPub = createMQTTConnection("pub", true);
+        mqttPub.setUserName("admin");
+        mqttPub.setPassword("admin");
+
+        mqttPub.setTracer(new Tracer() {
+            @Override
+            public void onReceive(MQTTFrame frame) {
+                LOG.info("Client received: {}", frame);
+                if (frame.messageType() == CONNACK.TYPE) {
+                    CONNACK connAck = new CONNACK();
+                    try {
+                        connAck.decode(frame);
+                        LOG.info("{}", connAck);
+                        assertEquals(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD,
connAck.code());
+                    } catch (ProtocolException e) {
+                        fail("Error decoding publish " + e.getMessage());
+                    }
+                }
+            }
+
+            @Override
+            public void onSend(MQTTFrame frame) {
+                LOG.info("Client sent: {}", frame);
+            }
+        });
+
+        BlockingConnection connectionPub = mqttPub.blockingConnection();
+        try {
+            connectionPub.connect();
+            fail("Should not be able to connect.");
+        } catch (Exception e) {}
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/3653f81b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/util/ResourceLoadingSslContext.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/util/ResourceLoadingSslContext.java
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/util/ResourceLoadingSslContext.java
new file mode 100644
index 0000000..01af49b
--- /dev/null
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/util/ResourceLoadingSslContext.java
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.mqtt.util;
+
+import java.io.File;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.security.KeyStore;
+import java.security.NoSuchAlgorithmException;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+
+import javax.annotation.PostConstruct;
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.TrustManagerFactory;
+
+import org.apache.activemq.broker.SslContext;
+import org.springframework.core.io.ClassPathResource;
+import org.springframework.core.io.FileSystemResource;
+import org.springframework.core.io.Resource;
+import org.springframework.core.io.UrlResource;
+import org.springframework.util.ResourceUtils;
+
+/**
+ * Extends the SslContext so that it's easier to configure from spring.
+ */
+public class ResourceLoadingSslContext extends SslContext {
+
+    private String keyStoreType = "jks";
+    private String trustStoreType = "jks";
+
+    private String secureRandomAlgorithm = "SHA1PRNG";
+    private String keyStoreAlgorithm = KeyManagerFactory.getDefaultAlgorithm();
+    private String trustStoreAlgorithm = TrustManagerFactory.getDefaultAlgorithm();
+
+    private String keyStore;
+    private String trustStore;
+
+    private String keyStoreKeyPassword;
+    private String keyStorePassword;
+    private String trustStorePassword;
+
+    /**
+     * JSR-250 callback wrapper; converts checked exceptions to runtime exceptions
+     *
+     * delegates to afterPropertiesSet, done to prevent backwards incompatible
+     * signature change.
+     */
+    @PostConstruct
+    private void postConstruct() {
+        try {
+            afterPropertiesSet();
+        } catch (Exception ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+
+    /**
+     *
+     * @throws Exception
+     * @org.apache.xbean.InitMethod
+     */
+    public void afterPropertiesSet() throws Exception {
+        keyManagers.addAll(createKeyManagers());
+        trustManagers.addAll(createTrustManagers());
+        if (secureRandom == null) {
+            secureRandom = createSecureRandom();
+        }
+    }
+
+    private SecureRandom createSecureRandom() throws NoSuchAlgorithmException {
+        return SecureRandom.getInstance(secureRandomAlgorithm);
+    }
+
+    private Collection<TrustManager> createTrustManagers() throws Exception {
+        KeyStore ks = createTrustManagerKeyStore();
+        if (ks == null) {
+            return new ArrayList<TrustManager>(0);
+        }
+
+        TrustManagerFactory tmf = TrustManagerFactory.getInstance(trustStoreAlgorithm);
+        tmf.init(ks);
+        return Arrays.asList(tmf.getTrustManagers());
+    }
+
+    private Collection<KeyManager> createKeyManagers() throws Exception {
+        KeyStore ks = createKeyManagerKeyStore();
+        if (ks == null) {
+            return new ArrayList<KeyManager>(0);
+        }
+
+        KeyManagerFactory tmf = KeyManagerFactory.getInstance(keyStoreAlgorithm);
+        tmf.init(ks, keyStoreKeyPassword == null ? (keyStorePassword == null ? null : keyStorePassword.toCharArray())
: keyStoreKeyPassword.toCharArray());
+        return Arrays.asList(tmf.getKeyManagers());
+    }
+
+    private KeyStore createTrustManagerKeyStore() throws Exception {
+        if (trustStore == null) {
+            return null;
+        }
+
+        KeyStore ks = KeyStore.getInstance(trustStoreType);
+        InputStream is = resourceFromString(trustStore).getInputStream();
+        try {
+            ks.load(is, trustStorePassword == null ? null : trustStorePassword.toCharArray());
+        } finally {
+            is.close();
+        }
+        return ks;
+    }
+
+    private KeyStore createKeyManagerKeyStore() throws Exception {
+        if (keyStore == null) {
+            return null;
+        }
+
+        KeyStore ks = KeyStore.getInstance(keyStoreType);
+        InputStream is = resourceFromString(keyStore).getInputStream();
+        try {
+            ks.load(is, keyStorePassword == null ? null : keyStorePassword.toCharArray());
+        } finally {
+            is.close();
+        }
+        return ks;
+    }
+
+    public String getTrustStoreType() {
+        return trustStoreType;
+    }
+
+    public String getKeyStoreType() {
+        return keyStoreType;
+    }
+
+    public String getKeyStore() {
+        return keyStore;
+    }
+
+    public void setKeyStore(String keyStore) throws MalformedURLException {
+        this.keyStore = keyStore;
+    }
+
+    public String getTrustStore() {
+        return trustStore;
+    }
+
+    public void setTrustStore(String trustStore) throws MalformedURLException {
+        this.trustStore = trustStore;
+    }
+
+    public String getKeyStoreAlgorithm() {
+        return keyStoreAlgorithm;
+    }
+
+    public void setKeyStoreAlgorithm(String keyAlgorithm) {
+        this.keyStoreAlgorithm = keyAlgorithm;
+    }
+
+    public String getTrustStoreAlgorithm() {
+        return trustStoreAlgorithm;
+    }
+
+    public void setTrustStoreAlgorithm(String trustAlgorithm) {
+        this.trustStoreAlgorithm = trustAlgorithm;
+    }
+
+    public String getKeyStoreKeyPassword() {
+        return keyStoreKeyPassword;
+    }
+
+    public void setKeyStoreKeyPassword(String keyPassword) {
+        this.keyStoreKeyPassword = keyPassword;
+    }
+
+    public String getKeyStorePassword() {
+        return keyStorePassword;
+    }
+
+    public void setKeyStorePassword(String keyPassword) {
+        this.keyStorePassword = keyPassword;
+    }
+
+    public String getTrustStorePassword() {
+        return trustStorePassword;
+    }
+
+    public void setTrustStorePassword(String trustPassword) {
+        this.trustStorePassword = trustPassword;
+    }
+
+    public void setKeyStoreType(String keyType) {
+        this.keyStoreType = keyType;
+    }
+
+    public void setTrustStoreType(String trustType) {
+        this.trustStoreType = trustType;
+    }
+
+    public String getSecureRandomAlgorithm() {
+        return secureRandomAlgorithm;
+    }
+
+    public void setSecureRandomAlgorithm(String secureRandomAlgorithm) {
+        this.secureRandomAlgorithm = secureRandomAlgorithm;
+    }
+
+    public static Resource resourceFromString(String uri) throws MalformedURLException {
+        Resource resource;
+        File file = new File(uri);
+        if (file.exists()) {
+            resource = new FileSystemResource(uri);
+        } else if (ResourceUtils.isUrl(uri)) {
+            resource = new UrlResource(uri);
+        } else {
+            resource = new ClassPathResource(uri);
+        }
+        return resource;
+    }
+}


Mime
View raw message