activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r1309566 [1/2] - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/transport/mqtt/ main/resources/META-INF/services/org/apache/activemq/transport/ test/java/org/apache/activemq/transport/mqtt/
Date Wed, 04 Apr 2012 19:46:27 GMT
Author: rajdavies
Date: Wed Apr  4 19:46:26 2012
New Revision: 1309566

URL: http://svn.apache.org/viewvc?rev=1309566&view=rev
Log:
more functionality for MQTT for https://issues.apache.org/jira/browse/AMQ-3786

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTSslTransportFactory.java
      - copied, changed from r1308717, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSslTransportFactory.java
    activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/mqtt+ssl
      - copied, changed from r1308717, activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp+ssl
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLConnectTest.java
      - copied, changed from r1308717, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTConnectTest.java
Removed:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/WildCardConvertor.java
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/package.html
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTConnectTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java?rev=1309566&r1=1309565&r2=1309566&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java
Wed Apr  4 19:46:26 2012
@@ -61,10 +61,8 @@ public class MQTTInactivityMonitor exten
 
     private long readCheckTime = DEFAULT_CHECK_TIME_MILLS;
     private long initialDelayTime = DEFAULT_CHECK_TIME_MILLS;
-    private boolean useKeepAlive = true;
     private boolean keepAliveResponseRequired;
 
-    protected WireFormat wireFormat;
 
     private final Runnable readChecker = new Runnable() {
         long lastRunTime;
@@ -99,7 +97,6 @@ public class MQTTInactivityMonitor exten
 
     public MQTTInactivityMonitor(Transport next, WireFormat wireFormat) {
         super(next);
-        this.wireFormat = wireFormat;
     }
 
     public void start() throws Exception {
@@ -198,9 +195,6 @@ public class MQTTInactivityMonitor exten
         }
     }
 
-    public void setUseKeepAlive(boolean val) {
-        useKeepAlive = val;
-    }
 
     public long getReadCheckTime() {
         return readCheckTime;
@@ -231,7 +225,7 @@ public class MQTTInactivityMonitor exten
         return this.monitorStarted.get();
     }
 
-    protected synchronized void startMonitorThread() throws IOException {
+    synchronized void startMonitorThread() {
         if (monitorStarted.get()) {
             return;
         }
@@ -258,7 +252,7 @@ public class MQTTInactivityMonitor exten
     }
 
 
-    protected synchronized void stopMonitorThread() {
+    synchronized void stopMonitorThread() {
         if (monitorStarted.compareAndSet(true, false)) {
             if (readCheckerTask != null) {
                 readCheckerTask.cancel();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java?rev=1309566&r1=1309565&r2=1309566&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
Wed Apr  4 19:46:26 2012
@@ -16,37 +16,30 @@
  */
 package org.apache.activemq.transport.mqtt;
 
-import java.io.BufferedReader;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.zip.DataFormatException;
+import java.util.zip.Inflater;
 
 import javax.jms.Destination;
 import javax.jms.JMSException;
+import javax.jms.Message;
 import org.apache.activemq.broker.BrokerContext;
 import org.apache.activemq.command.*;
-import org.apache.activemq.transport.stomp.FrameTranslator;
-import org.apache.activemq.transport.stomp.LegacyFrameTranslator;
 import org.apache.activemq.transport.stomp.ProtocolException;
-import org.apache.activemq.transport.stomp.StompSubscription;
+import org.apache.activemq.util.ByteArrayOutputStream;
 import org.apache.activemq.util.ByteSequence;
-import org.apache.activemq.util.FactoryFinder;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.IdGenerator;
 import org.apache.activemq.util.LRUCache;
 import org.apache.activemq.util.LongSequenceGenerator;
 import org.fusesource.hawtbuf.Buffer;
 import org.fusesource.hawtbuf.UTF8Buffer;
-import org.fusesource.mqtt.codec.CONNACK;
-import org.fusesource.mqtt.codec.CONNECT;
-import org.fusesource.mqtt.codec.DISCONNECT;
-import org.fusesource.mqtt.codec.MQTTFrame;
-import org.fusesource.mqtt.codec.PINGREQ;
-import org.fusesource.mqtt.codec.PINGRESP;
-import org.fusesource.mqtt.codec.PUBLISH;
+import org.fusesource.mqtt.client.QoS;
+import org.fusesource.mqtt.client.Topic;
+import org.fusesource.mqtt.codec.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,66 +48,46 @@ class MQTTProtocolConverter {
     private static final Logger LOG = LoggerFactory.getLogger(MQTTProtocolConverter.class);
 
     private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
-
-    private static final String BROKER_VERSION;
     private static final MQTTFrame PING_RESP_FRAME = new PINGRESP().encode();
 
-    static {
-        InputStream in = null;
-        String version = "5.6.0";
-        if ((in = MQTTProtocolConverter.class.getResourceAsStream("/org/apache/activemq/version.txt"))
!= null) {
-            BufferedReader reader = new BufferedReader(new InputStreamReader(in));
-            try {
-                version = reader.readLine();
-            } catch (Exception e) {
-            }
-        }
-        BROKER_VERSION = version;
-    }
 
     private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
     private final SessionId sessionId = new SessionId(connectionId, -1);
     private final ProducerId producerId = new ProducerId(sessionId, 1);
-
-    private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
     private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
-    private final LongSequenceGenerator transactionIdGenerator = new LongSequenceGenerator();
+    private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
     private final LongSequenceGenerator tempDestinationGenerator = new LongSequenceGenerator();
 
     private final ConcurrentHashMap<Integer, ResponseHandler> resposeHandlers = new
ConcurrentHashMap<Integer, ResponseHandler>();
-    private final ConcurrentHashMap<ConsumerId, StompSubscription> subscriptionsByConsumerId
= new ConcurrentHashMap<ConsumerId, StompSubscription>();
-    private final ConcurrentHashMap<String, StompSubscription> subscriptions = new
ConcurrentHashMap<String, StompSubscription>();
+    private final ConcurrentHashMap<ConsumerId, MQTTSubscription> subscriptionsByConsumerId
= new ConcurrentHashMap<ConsumerId, MQTTSubscription>();
+    private final ConcurrentHashMap<UTF8Buffer, MQTTSubscription> mqttSubscriptionByTopic
= new ConcurrentHashMap<UTF8Buffer, MQTTSubscription>();
     private final ConcurrentHashMap<String, ActiveMQDestination> tempDestinations =
new ConcurrentHashMap<String, ActiveMQDestination>();
     private final ConcurrentHashMap<String, String> tempDestinationAmqToStompMap =
new ConcurrentHashMap<String, String>();
-    private final Map<String, LocalTransactionId> transactions = new ConcurrentHashMap<String,
LocalTransactionId>();
     private final Map<UTF8Buffer, ActiveMQTopic> activeMQTopicMap = new LRUCache<UTF8Buffer,
ActiveMQTopic>();
     private final Map<Destination, UTF8Buffer> mqttTopicMap = new LRUCache<Destination,
UTF8Buffer>();
+    private final Map<Short, MessageAck> consumerAcks = new LRUCache<Short, MessageAck>();
     private final MQTTTransport mqttTransport;
 
     private final Object commnadIdMutex = new Object();
     private int lastCommandId;
     private final AtomicBoolean connected = new AtomicBoolean(false);
-    private final FrameTranslator frameTranslator = new LegacyFrameTranslator();
-    private final FactoryFinder FRAME_TRANSLATOR_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/frametranslator/");
-    private final BrokerContext brokerContext;
-    private String version = "1.0";
-    ConnectionInfo connectionInfo = new ConnectionInfo();
+    private ConnectionInfo connectionInfo = new ConnectionInfo();
     private CONNECT connect;
     private String clientId;
+    private final String QOS_PROPERTY_NAME = "QoSPropertyName";
 
     public MQTTProtocolConverter(MQTTTransport mqttTransport, BrokerContext brokerContext)
{
         this.mqttTransport = mqttTransport;
-        this.brokerContext = brokerContext;
     }
 
-    protected int generateCommandId() {
+    int generateCommandId() {
         synchronized (commnadIdMutex) {
             return lastCommandId++;
         }
     }
 
 
-    protected void sendToActiveMQ(Command command, ResponseHandler handler) {
+    void sendToActiveMQ(Command command, ResponseHandler handler) {
         command.setCommandId(generateCommandId());
         if (handler != null) {
             command.setResponseRequired(true);
@@ -123,6 +96,14 @@ class MQTTProtocolConverter {
         mqttTransport.sendToActiveMQ(command);
     }
 
+    void sendToMQTT(MQTTFrame frame) {
+        try {
+            mqttTransport.sendToMQTT(frame);
+        } catch (IOException e) {
+            LOG.warn("Failed to send frame " + frame, e);
+        }
+    }
+
 
     /**
      * Convert a MQTT command
@@ -138,6 +119,7 @@ class MQTTProtocolConverter {
             }
             case CONNECT.TYPE: {
                 onMQTTConnect(new CONNECT().decode(frame));
+                LOG.debug("MQTT Client " + getClientId() + " connected.");
                 break;
             }
             case DISCONNECT.TYPE: {
@@ -145,6 +127,22 @@ class MQTTProtocolConverter {
                 stopTransport();
                 break;
             }
+            case SUBSCRIBE.TYPE: {
+                onSubscribe(new SUBSCRIBE().decode(frame));
+                break;
+            }
+            case UNSUBSCRIBE.TYPE: {
+                onUnSubscribe(new UNSUBSCRIBE().decode(frame));
+                break;
+            }
+            case PUBLISH.TYPE: {
+                onMQTTPublish(new PUBLISH().decode(frame));
+                break;
+            }
+            case PUBACK.TYPE: {
+                onMQTTPubAck(new PUBACK().decode(frame));
+                break;
+            }
             default:
                 handleException(new MQTTProtocolException("Unknown MQTTFrame type: " + frame.messageType(),
true), frame);
         }
@@ -152,7 +150,7 @@ class MQTTProtocolConverter {
     }
 
 
-    protected void onMQTTConnect(final CONNECT connect) throws ProtocolException {
+    void onMQTTConnect(final CONNECT connect) throws ProtocolException {
 
         if (connected.get()) {
             throw new ProtocolException("All ready connected.");
@@ -171,15 +169,13 @@ class MQTTProtocolConverter {
         String passswd = "";
         if (connect.password() != null) {
             passswd = connect.password().toString();
-
         }
 
-
         configureInactivityMonitor(connect.keepAlive());
 
 
         connectionInfo.setConnectionId(connectionId);
-        if (clientId != null && clientId.isEmpty() == false) {
+        if (clientId != null && !clientId.isEmpty()) {
             connectionInfo.setClientId(clientId);
         } else {
             connectionInfo.setClientId("" + connectionInfo.getConnectionId().toString());
@@ -222,6 +218,7 @@ class MQTTProtocolConverter {
 
                         CONNACK ack = new CONNACK();
                         ack.code(CONNACK.Code.CONNECTION_ACCEPTED);
+                        connected.set(true);
                         getMQTTTransport().sendToMQTT(ack.encode());
 
                     }
@@ -231,13 +228,88 @@ class MQTTProtocolConverter {
         });
     }
 
+    void onSubscribe(SUBSCRIBE command) throws MQTTProtocolException {
+        checkConnected();
+        SUBACK result = new SUBACK();
+        Topic[] topics = command.topics();
+        if (topics != null) {
+            byte[] qos = new byte[topics.length];
+            for (int i = 0; i < topics.length; i++) {
+                qos[i] = (byte) onSubscribe(command, topics[i]).ordinal();
+            }
+            SUBACK ack = new SUBACK();
+            ack.messageId(command.messageId());
+            ack.grantedQos(qos);
+            try {
+                getMQTTTransport().sendToMQTT(ack.encode());
+            } catch (IOException e) {
+                LOG.warn("Couldn't send SUBACK for " + command, e);
+            }
+        } else {
+            LOG.warn("No topics defined for Subscription " + command);
+        }
+    }
+
+    QoS onSubscribe(SUBSCRIBE command, Topic topic) throws MQTTProtocolException {
+        ActiveMQDestination destination = new ActiveMQTopic(convertMQTTToActiveMQ(topic.name().toString()));
+
+
+        if (destination == null) {
+            throw new MQTTProtocolException("Invalid Destination.");
+        }
+
+        ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
+        ConsumerInfo consumerInfo = new ConsumerInfo(id);
+        consumerInfo.setDestination(destination);
+        consumerInfo.setPrefetchSize(1000);
+        consumerInfo.setDispatchAsync(true);
+        if (!connect.cleanSession() && (connect.clientId() != null)) {
+            //by default subscribers are persistent
+            consumerInfo.setSubscriptionName(connect.clientId().toString());
+        }
+
+        MQTTSubscription mqttSubscription = new MQTTSubscription(this, command.qos(), consumerInfo);
+
+
+        subscriptionsByConsumerId.put(id, mqttSubscription);
+        mqttSubscriptionByTopic.put(topic.name(), mqttSubscription);
+
+        sendToActiveMQ(consumerInfo, null);
+        return topic.qos();
+    }
+
+    void onUnSubscribe(UNSUBSCRIBE command) {
+        UTF8Buffer[] topics = command.topics();
+        if (topics != null) {
+            for (int i = 0; i < topics.length; i++) {
+                onUnSubscribe(topics[i]);
+            }
+        }
+        UNSUBACK ack = new UNSUBACK();
+        ack.messageId(command.messageId());
+        sendToMQTT(ack.encode());
+
+    }
+
+    void onUnSubscribe(UTF8Buffer topicName) {
+        MQTTSubscription subs = mqttSubscriptionByTopic.remove(topicName);
+        if (subs != null) {
+            ConsumerInfo info = subs.getConsumerInfo();
+            if (info != null) {
+                subscriptionsByConsumerId.remove(info.getConsumerId());
+            }
+            RemoveInfo removeInfo = info.createRemoveCommand();
+            sendToActiveMQ(removeInfo, null);
+        }
+    }
+
 
     /**
      * Dispatch a ActiveMQ command
      */
 
 
-    public void onActiveMQCommand(Command command) throws IOException, JMSException {
+    public void onActiveMQCommand(Command command) throws Exception {
         if (command.isResponse()) {
             Response response = (Response) command;
             ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId()));
@@ -252,28 +324,59 @@ class MQTTProtocolConverter {
             }
         } else if (command.isMessageDispatch()) {
             MessageDispatch md = (MessageDispatch) command;
-            StompSubscription sub = subscriptionsByConsumerId.get(md.getConsumerId());
+            MQTTSubscription sub = subscriptionsByConsumerId.get(md.getConsumerId());
             if (sub != null) {
-                //sub.onMessageDispatch(md);
+                MessageAck ack = sub.createMessageAck(md);
+                PUBLISH publish = convertMessage((ActiveMQMessage) md.getMessage());
+                if (ack != null) {
+                    synchronized (consumerAcks) {
+                        consumerAcks.put(publish.messageId(), ack);
+                    }
+                }
+                getMQTTTransport().sendToMQTT(publish.encode());
             }
         } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE)
{
             // Pass down any unexpected async errors. Should this close the connection?
             Throwable exception = ((ConnectionError) command).getException();
             handleException(exception, null);
+        } else if (command.isBrokerInfo()) {
+            //ignore
         } else {
             LOG.debug("Do not know how to process ActiveMQ Command " + command);
         }
     }
 
+    void onMQTTPublish(PUBLISH command) throws IOException, JMSException {
+        checkConnected();
+        ActiveMQMessage message = convertMessage(command);
+        message.setProducerId(producerId);
+        message.onSend();
+        sendToActiveMQ(message, createResponseHandler(command));
+    }
 
-    public ActiveMQMessage convertMessage(PUBLISH command) throws IOException, JMSException
{
+    void onMQTTPubAck(PUBACK command) {
+        short messageId = command.messageId();
+        MessageAck ack = null;
+        synchronized (consumerAcks) {
+            ack = consumerAcks.remove(messageId);
+        }
+        if (ack != null) {
+            getMQTTTransport().sendToActiveMQ(ack);
+        }
+    }
+
+
+    ActiveMQMessage convertMessage(PUBLISH command) throws JMSException {
         ActiveMQBytesMessage msg = new ActiveMQBytesMessage();
-        StringBuilder msgId = new StringBuilder();
-        msgId.append("ID:").append(getClientId()).append(":").append(command.messageId());
-        msg.setJMSMessageID(msgId.toString());
-        msg.setJMSPriority(4);
 
-        //ActiveMQTopic topic = new ActiveMQTopic(topicName);
+        msg.setProducerId(producerId);
+        MessageId id = new MessageId(producerId, messageIdGenerator.getNextSequenceId());
+        msg.setMessageId(id);
+        msg.setTimestamp(System.currentTimeMillis());
+        msg.setPriority((byte) Message.DEFAULT_PRIORITY);
+        msg.setPersistent(command.qos() != QoS.AT_MOST_ONCE);
+        msg.setIntProperty(QOS_PROPERTY_NAME, command.qos().ordinal());
+
         ActiveMQTopic topic = null;
         synchronized (activeMQTopicMap) {
             topic = activeMQTopicMap.get(command.topicName());
@@ -288,36 +391,48 @@ class MQTTProtocolConverter {
         return msg;
     }
 
-    public MQTTFrame convertMessage(ActiveMQMessage message) throws IOException, JMSException
{
+    public PUBLISH convertMessage(ActiveMQMessage message) throws IOException, JMSException,
DataFormatException {
         PUBLISH result = new PUBLISH();
-        String msgId = message.getJMSMessageID();
-        int offset = msgId.lastIndexOf(':');
+        short id = (short) message.getMessageId().getProducerSequenceId();
+        result.messageId(id);
+        QoS qoS;
+        if (message.propertyExists(QOS_PROPERTY_NAME)) {
+            int ordinal = message.getIntProperty(QOS_PROPERTY_NAME);
+            qoS = QoS.values()[ordinal];
 
-        short id = 0;
-        if (offset > 0) {
-            Short.parseShort(msgId.substring(offset, msgId.length() - 1));
+        } else {
+            qoS = message.isPersistent() ? QoS.AT_MOST_ONCE : QoS.AT_LEAST_ONCE;
         }
-        result.messageId(id);
+        result.qos(qoS);
 
-        UTF8Buffer topicName = null;
+        UTF8Buffer topicName;
         synchronized (mqttTopicMap) {
             topicName = mqttTopicMap.get(message.getJMSDestination());
             if (topicName == null) {
-                topicName = new UTF8Buffer(message.getDestination().getPhysicalName().replaceAll(".",
"/"));
+                topicName = new UTF8Buffer(message.getDestination().getPhysicalName().replace('.',
'/'));
                 mqttTopicMap.put(message.getJMSDestination(), topicName);
             }
         }
         result.topicName(topicName);
 
-        if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) {
+        ByteSequence byteSequence = message.getContent();
+        if (message.isCompressed()) {
+            Inflater inflater = new Inflater();
+            inflater.setInput(byteSequence.data, byteSequence.offset, byteSequence.length);
+            byte[] data = new byte[4096];
+            int read;
+            ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+            while ((read = inflater.inflate(data, 0, data.length)) != 0) {
+                bytesOut.write(data, 0, read);
+            }
+            byteSequence = bytesOut.toByteSequence();
+        }
 
-            if (!message.isCompressed() && message.getContent() != null) {
-                ByteSequence msgContent = message.getContent();
-                if (msgContent.getLength() > 4) {
-                    byte[] content = new byte[msgContent.getLength() - 4];
-                    System.arraycopy(msgContent.data, 4, content, 0, content.length);
-                    result.payload(new Buffer(content));
-                }
+        if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) {
+            if (byteSequence.getLength() > 4) {
+                byte[] content = new byte[byteSequence.getLength() - 4];
+                System.arraycopy(byteSequence.data, 4, content, 0, content.length);
+                result.payload(new Buffer(content));
             } else {
                 ActiveMQTextMessage msg = (ActiveMQTextMessage) message.copy();
                 String messageText = msg.getText();
@@ -334,9 +449,11 @@ class MQTTProtocolConverter {
             msg.readBytes(data);
             result.payload(new Buffer(data));
         } else {
-            LOG.debug("Cannot convert " + message + " to a MQTT PUBLISH");
+            if (byteSequence != null && byteSequence.getLength() > 0) {
+                result.payload(new Buffer(byteSequence.data, byteSequence.offset, byteSequence.length));
+            }
         }
-        return result.encode();
+        return result;
     }
 
 
@@ -359,12 +476,8 @@ class MQTTProtocolConverter {
         return rc;
     }
 
-    public String getCreatedTempDestinationName(ActiveMQDestination destination) {
-        return tempDestinationAmqToStompMap.get(destination.getQualifiedName());
-    }
-
 
-    protected void configureInactivityMonitor(short heartBeat) throws ProtocolException {
+    void configureInactivityMonitor(short heartBeat) {
         try {
 
             int heartBeatMS = heartBeat * 1000;
@@ -372,18 +485,17 @@ class MQTTProtocolConverter {
 
             monitor.setReadCheckTime(heartBeatMS);
             monitor.setInitialDelayTime(heartBeatMS);
-
             monitor.startMonitorThread();
 
         } catch (Exception ex) {
-
+            LOG.warn("Failed to start MQTT InactivityMonitor ", ex);
         }
 
         LOG.debug(getClientId() + " MQTT Connection using heart beat of  " + heartBeat +
" secs");
     }
 
 
-    protected void handleException(Throwable exception, MQTTFrame command) throws IOException
{
+    void handleException(Throwable exception, MQTTFrame command) {
         LOG.warn("Exception occurred processing: \n" + command + ": " + exception.toString());
         if (LOG.isDebugEnabled()) {
             LOG.debug("Exception detail", exception);
@@ -396,6 +508,12 @@ class MQTTProtocolConverter {
         }
     }
 
+    void checkConnected() throws MQTTProtocolException {
+        if (!connected.get()) {
+            throw new MQTTProtocolException("Not connected.");
+        }
+    }
+
     private String getClientId() {
         if (clientId == null) {
             if (connect != null && connect.clientId() != null) {
@@ -414,4 +532,67 @@ class MQTTProtocolConverter {
             LOG.debug("Failed to stop MQTT transport ", e);
         }
     }
+
+    ResponseHandler createResponseHandler(final PUBLISH command) {
+
+        if (command != null) {
+            switch (command.qos()) {
+                case AT_LEAST_ONCE:
+                    return new ResponseHandler() {
+                        public void onResponse(MQTTProtocolConverter converter, Response
response) throws IOException {
+                            if (response.isException()) {
+                                LOG.warn("Failed to send MQTT Publish: ", command, ((ExceptionResponse)
response).getException());
+                            } else {
+                                PUBACK ack = new PUBACK();
+                                ack.messageId(command.messageId());
+                                converter.getMQTTTransport().sendToMQTT(ack.encode());
+                            }
+                        }
+                    };
+                case EXACTLY_ONCE:
+                    return new ResponseHandler() {
+                        public void onResponse(MQTTProtocolConverter converter, Response
response) throws IOException {
+                            if (response.isException()) {
+                                LOG.warn("Failed to send MQTT Publish: ", command, ((ExceptionResponse)
response).getException());
+                            } else {
+                                PUBACK ack = new PUBACK();
+                                ack.messageId(command.messageId());
+                                converter.getMQTTTransport().sendToMQTT(ack.encode());
+                            }
+                        }
+                    };
+                case AT_MOST_ONCE:
+                    break;
+            }
+        }
+        /*
+        final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
+        if (receiptId != null) {
+            return new ResponseHandler() {
+                public void onResponse(ProtocolConverter converter, Response response) throws
IOException {
+                    if (response.isException()) {
+                        // Generally a command can fail.. but that does not invalidate the
connection.
+                        // We report back the failure but we don't close the connection.
+                        Throwable exception = ((ExceptionResponse)response).getException();
+                        handleException(exception, command);
+                    } else {
+                        StompFrame sc = new StompFrame();
+                        sc.setAction(Stomp.Responses.RECEIPT);
+                        sc.setHeaders(new HashMap<String, String>(1));
+                        sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
+                        stompTransport.sendToStomp(sc);
+                    }
+                }
+            };
+        }
+        */
+        return null;
+    }
+
+    private String convertMQTTToActiveMQ(String name) {
+        String result = name.replace('>', '#');
+        result = result.replace('*', '+');
+        result = result.replace('.', '/');
+        return result;
+    }
 }

Copied: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTSslTransportFactory.java
(from r1308717, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSslTransportFactory.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTSslTransportFactory.java?p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTSslTransportFactory.java&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSslTransportFactory.java&r1=1308717&r2=1309566&rev=1309566&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSslTransportFactory.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTSslTransportFactory.java
Wed Apr  4 19:46:26 2012
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.transport.stomp;
+package org.apache.activemq.transport.mqtt;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -29,21 +29,20 @@ import org.apache.activemq.util.Introspe
 import org.apache.activemq.wireformat.WireFormat;
 
 /**
- * A <a href="http://stomp.codehaus.org/">STOMP</a> over SSL transport factory
- *
- *
+ * A <a href="http://mqtt.org/">MQTT</a> over SSL transport factory
  */
-public class StompSslTransportFactory extends SslTransportFactory implements BrokerServiceAware
{
+public class MQTTSslTransportFactory extends SslTransportFactory implements BrokerServiceAware
{
 
     private BrokerContext brokerContext = null;
 
     protected String getDefaultWireFormatType() {
-        return "stomp";
+        return "mqtt";
     }
 
     @SuppressWarnings("rawtypes")
+
     public Transport compositeConfigure(Transport transport, WireFormat format, Map options)
{
-        transport = new StompTransportFilter(transport, format, brokerContext);
+        transport = new MQTTTransportFilter(transport, format, brokerContext);
         IntrospectionSupport.setProperties(transport, options);
         return super.compositeConfigure(transport, format, options);
     }
@@ -65,4 +64,13 @@ public class StompSslTransportFactory ex
         this.brokerContext = brokerService.getBrokerContext();
     }
 
+    protected Transport createInactivityMonitor(Transport transport, WireFormat format) {
+        MQTTInactivityMonitor monitor = new MQTTInactivityMonitor(transport, format);
+
+        MQTTTransportFilter filter = transport.narrow(MQTTTransportFilter.class);
+        filter.setInactivityMonitor(monitor);
+
+        return monitor;
+    }
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java?rev=1309566&r1=1309565&r2=1309566&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java
Wed Apr  4 19:46:26 2012
@@ -16,70 +16,40 @@
  */
 package org.apache.activemq.transport.mqtt;
 
-import java.io.IOException;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-
-import javax.jms.JMSException;
 import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatch;
-import org.apache.activemq.command.MessageId;
-import org.fusesource.mqtt.codec.MQTTFrame;
+import org.fusesource.mqtt.client.QoS;
 
 /**
- * Keeps track of the STOMP subscription so that acking is correctly done.
+ * Keeps track of the MQTT client subscription so that acking is correctly done.
  */
-public class MQTTSubscription {
-
-
-    protected final MQTTProtocolConverter protocolConverter;
-    protected final String subscriptionId;
-    protected final ConsumerInfo consumerInfo;
+class MQTTSubscription {
+    private final MQTTProtocolConverter protocolConverter;
 
-    protected final LinkedHashMap<MessageId, MessageDispatch> dispatchedMessage = new
LinkedHashMap<MessageId, MessageDispatch>();
-    protected final LinkedList<MessageDispatch> unconsumedMessage = new LinkedList<MessageDispatch>();
+    private final ConsumerInfo consumerInfo;
+    private ActiveMQDestination destination;
+    private final QoS qos;
 
-    protected ActiveMQDestination destination;
-    protected String transformation;
-
-    public MQTTSubscription(MQTTProtocolConverter protocolConverter, String subscriptionId,
ConsumerInfo consumerInfo, String transformation) {
+    public MQTTSubscription(MQTTProtocolConverter protocolConverter, QoS qos, ConsumerInfo
consumerInfo) {
         this.protocolConverter = protocolConverter;
-        this.subscriptionId = subscriptionId;
         this.consumerInfo = consumerInfo;
-        this.transformation = transformation;
+        this.qos = qos;
     }
 
-    void onMessageDispatch(MessageDispatch md) throws IOException, JMSException {
-        ActiveMQMessage message = (ActiveMQMessage) md.getMessage();
-        /*
-        if (ackMode == CLIENT_ACK) {
-            synchronized (this) {
-                dispatchedMessage.put(message.getMessageId(), md);
-            }
-        } else if (ackMode == INDIVIDUAL_ACK) {
-            synchronized (this) {
-                dispatchedMessage.put(message.getMessageId(), md);
+    MessageAck createMessageAck(MessageDispatch md) {
+
+        switch (qos) {
+            case AT_MOST_ONCE: {
+                return null;
             }
-        } else if (ackMode == AUTO_ACK) {
-            MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
-            protocolConverter.getStompTransport().sendToActiveMQ(ack);
-        }
-        */
-        MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
-        protocolConverter.getMQTTTransport().sendToActiveMQ(ack);
 
-        MQTTFrame command = protocolConverter.convertMessage(message);
-        protocolConverter.getMQTTTransport().sendToMQTT(command);
+        }
+        return new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
     }
 
 
-    public String getSubscriptionId() {
-        return subscriptionId;
-    }
-
     public void setDestination(ActiveMQDestination destination) {
         this.destination = destination;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java?rev=1309566&r1=1309565&r2=1309566&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java
Wed Apr  4 19:46:26 2012
@@ -29,7 +29,7 @@ import org.apache.activemq.util.Introspe
 import org.apache.activemq.wireformat.WireFormat;
 
 /**
- * A <a href="http://stomp.codehaus.org/">STOMP</a> transport factory
+ * A <a href="http://mqtt.org/">MQTT</a> transport factory
  */
 public class MQTTTransportFactory extends TcpTransportFactory implements BrokerServiceAware
{
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java?rev=1309566&r1=1309565&r2=1309566&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
Wed Apr  4 19:46:26 2012
@@ -36,9 +36,7 @@ import org.slf4j.LoggerFactory;
  * The StompTransportFilter normally sits on top of a TcpTransport that has been
  * configured with the StompWireFormat and is used to convert STOMP commands to
  * ActiveMQ commands. All of the conversion work is done by delegating to the
- * MQTTProtocolConverter.
- *
- * @author <a href="http://hiramchirino.com">chirino</a>
+ * MQTTProtocolConverter
  */
 public class MQTTTransportFilter extends TransportFilter implements MQTTTransport {
     private static final Logger LOG = LoggerFactory.getLogger(MQTTTransportFilter.class);
@@ -62,7 +60,7 @@ public class MQTTTransportFilter extends
         try {
             final Command command = (Command) o;
             protocolConverter.onActiveMQCommand(command);
-        } catch (JMSException e) {
+        } catch (Exception e) {
             throw IOExceptionSupport.create(e);
         }
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/package.html
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/package.html?rev=1309566&r1=1309565&r2=1309566&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/package.html
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/package.html
Wed Apr  4 19:46:26 2012
@@ -19,7 +19,7 @@
 </head>
 <body>
 
-An implementation of the MQTT 3.1 protocol - see http://mqtt.org/
+A Broker side implementation of the MQTT 3.1 protocol - see http://mqtt.org/
 
 </body>
 </html>

Copied: activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/mqtt+ssl
(from r1308717, activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp+ssl)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/mqtt%2Bssl?p2=activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/mqtt%2Bssl&p1=activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp%2Bssl&r1=1308717&r2=1309566&rev=1309566&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp+ssl
(original)
+++ activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/mqtt+ssl
Wed Apr  4 19:46:26 2012
@@ -14,4 +14,4 @@
 ## See the License for the specific language governing permissions and
 ## limitations under the License.
 ## ---------------------------------------------------------------------------
-class=org.apache.activemq.transport.stomp.StompSslTransportFactory
+class=org.apache.activemq.transport.mqtt.MQTTSslTransportFactory

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTConnectTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTConnectTest.java?rev=1309566&r1=1309565&r2=1309566&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTConnectTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTConnectTest.java
Wed Apr  4 19:46:26 2012
@@ -27,7 +27,7 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-// https://issues.apache.org/jira/browse/AMQ-3393
+
 public class MQTTConnectTest {
     private static final Logger LOG = LoggerFactory.getLogger(MQTTConnectTest.class);
     BrokerService brokerService;
@@ -54,7 +54,7 @@ public class MQTTConnectTest {
         brokerService.addConnector("mqtt://localhost:1883");
         brokerService.start();
         MQTT mqtt = new MQTT();
-        mqtt.setHost("localhost",1883);
+        mqtt.setHost("localhost", 1883);
         BlockingConnection connection = mqtt.blockingConnection();
 
         connection.connect();
@@ -62,5 +62,5 @@ public class MQTTConnectTest {
         connection.disconnect();
     }
 
-    
+
 }
\ No newline at end of file

Copied: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLConnectTest.java
(from r1308717, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTConnectTest.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLConnectTest.java?p2=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLConnectTest.java&p1=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTConnectTest.java&r1=1308717&r2=1309566&rev=1309566&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTConnectTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLConnectTest.java
Wed Apr  4 19:46:26 2012
@@ -16,8 +16,15 @@
  */
 package org.apache.activemq.transport.mqtt;
 
+import java.security.SecureRandom;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
 import java.util.Vector;
 
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
 import org.apache.activemq.broker.BrokerService;
 import org.fusesource.mqtt.client.BlockingConnection;
 import org.fusesource.mqtt.client.MQTT;
@@ -27,14 +34,21 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-// https://issues.apache.org/jira/browse/AMQ-3393
-public class MQTTConnectTest {
-    private static final Logger LOG = LoggerFactory.getLogger(MQTTConnectTest.class);
+
+public class MQTTSSLConnectTest {
+    private static final Logger LOG = LoggerFactory.getLogger(MQTTSSLConnectTest.class);
     BrokerService brokerService;
     Vector<Throwable> exceptions = new Vector<Throwable>();
 
     @Before
     public void startBroker() throws Exception {
+        System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore");
+        System.setProperty("javax.net.ssl.trustStorePassword", "password");
+        System.setProperty("javax.net.ssl.trustStoreType", "jks");
+        System.setProperty("javax.net.ssl.keyStore", "src/test/resources/server.keystore");
+        System.setProperty("javax.net.ssl.keyStorePassword", "password");
+        System.setProperty("javax.net.ssl.keyStoreType", "jks");
+
         exceptions.clear();
         brokerService = new BrokerService();
         brokerService.setPersistent(false);
@@ -51,10 +65,13 @@ public class MQTTConnectTest {
     @Test
     public void testConnect() throws Exception {
 
-        brokerService.addConnector("mqtt://localhost:1883");
+        brokerService.addConnector("mqtt+ssl://localhost:8883");
         brokerService.start();
         MQTT mqtt = new MQTT();
-        mqtt.setHost("localhost",1883);
+        mqtt.setHost("ssl://localhost:8883");
+        SSLContext ctx = SSLContext.getInstance("TLS");
+        ctx.init(new KeyManager[0], new TrustManager[]{new DefaultTrustManager()}, new SecureRandom());
+        mqtt.setSslContext(ctx);
         BlockingConnection connection = mqtt.blockingConnection();
 
         connection.connect();
@@ -62,5 +79,17 @@ public class MQTTConnectTest {
         connection.disconnect();
     }
 
-    
+
+    private static class DefaultTrustManager implements X509TrustManager {
+
+        public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws
CertificateException {
+        }
+
+        public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws
CertificateException {
+        }
+
+        public X509Certificate[] getAcceptedIssuers() {
+            return new X509Certificate[0];
+        }
+    }
 }
\ No newline at end of file



Mime
View raw message