activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r1304984 [1/2] - in /activemq/trunk/activemq-core: ./ src/main/java/org/apache/activemq/transport/mqtt/ src/main/resources/META-INF/services/org/apache/activemq/transport/ src/main/resources/META-INF/services/org/apache/activemq/wireformat/...
Date Sun, 25 Mar 2012 06:33:50 GMT
Author: rajdavies
Date: Sun Mar 25 06:33:49 2012
New Revision: 1304984

URL: http://svn.apache.org/viewvc?rev=1304984&view=rev
Log:
intial files to support MQTT  -see https://issues.apache.org/jira/browse/AMQ-3786

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java   (contents, props changed)
      - copied, changed from r1303689, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolException.java
      - copied, changed from r1303689, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolException.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java
      - copied, changed from r1303689, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransport.java
      - copied, changed from r1303689, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java
      - copied, changed from r1303689, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
      - copied, changed from r1303689, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormat.java   (contents, props changed)
      - copied, changed from r1303689, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormatFactory.java   (contents, props changed)
      - copied, changed from r1303689, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormatFactory.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/ResponseHandler.java
      - copied, changed from r1303689, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ResponseHandler.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/WildCardConvertor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/package.html
      - copied, changed from r1303689, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/package.html
    activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/mqtt
      - copied, changed from r1303689, activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp
    activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/wireformat/mqtt
      - copied, changed from r1303689, activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/wireformat/stomp
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTConnectTest.java
      - copied, changed from r1303689, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/ConnectTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
      - copied, changed from r1303689, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
Modified:
    activemq/trunk/activemq-core/pom.xml

Modified: activemq/trunk/activemq-core/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/pom.xml?rev=1304984&r1=1304983&r2=1304984&view=diff
==============================================================================
--- activemq/trunk/activemq-core/pom.xml (original)
+++ activemq/trunk/activemq-core/pom.xml Sun Mar 25 06:33:49 2012
@@ -96,6 +96,10 @@
       <groupId>org.fusesource.fuse-extra</groupId>
       <artifactId>fusemq-leveldb</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.fusesource.mqtt-client</groupId>
+      <artifactId>mqtt-client</artifactId>
+    </dependency>
 
     <!-- =============================== -->
     <!-- Optional Dependencies           -->

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java?rev=1304984&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java Sun Mar 25 06:33:49 2012
@@ -0,0 +1,294 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.transport.mqtt;
+
+import java.io.IOException;
+import java.util.Timer;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.activemq.command.KeepAliveInfo;
+import org.apache.activemq.thread.SchedulerTimerTask;
+import org.apache.activemq.transport.AbstractInactivityMonitor;
+import org.apache.activemq.transport.InactivityIOException;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFilter;
+import org.apache.activemq.wireformat.WireFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MQTTInactivityMonitor extends TransportFilter {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MQTTInactivityMonitor.class);
+
+    private static ThreadPoolExecutor ASYNC_TASKS;
+    private static int CHECKER_COUNTER;
+    private static long DEFAULT_CHECK_TIME_MILLS = 30000;
+    private static Timer READ_CHECK_TIMER;
+
+    private final AtomicBoolean monitorStarted = new AtomicBoolean(false);
+
+    private final AtomicBoolean commandSent = new AtomicBoolean(false);
+    private final AtomicBoolean inSend = new AtomicBoolean(false);
+    private final AtomicBoolean failed = new AtomicBoolean(false);
+
+    private final AtomicBoolean commandReceived = new AtomicBoolean(true);
+    private final AtomicBoolean inReceive = new AtomicBoolean(false);
+    private final AtomicInteger lastReceiveCounter = new AtomicInteger(0);
+
+    private final ReentrantReadWriteLock sendLock = new ReentrantReadWriteLock();
+    private SchedulerTimerTask readCheckerTask;
+
+    private long readCheckTime = DEFAULT_CHECK_TIME_MILLS;
+    private long initialDelayTime = DEFAULT_CHECK_TIME_MILLS;
+    private boolean useKeepAlive = true;
+    private boolean keepAliveResponseRequired;
+
+    protected WireFormat wireFormat;
+
+    private final Runnable readChecker = new Runnable() {
+        long lastRunTime;
+
+        public void run() {
+            long now = System.currentTimeMillis();
+            long elapsed = (now - lastRunTime);
+
+            if (lastRunTime != 0 && LOG.isDebugEnabled()) {
+                LOG.debug("" + elapsed + " ms elapsed since last read check.");
+            }
+
+            // Perhaps the timer executed a read check late.. and then executes
+            // the next read check on time which causes the time elapsed between
+            // read checks to be small..
+
+            // If less than 90% of the read check Time elapsed then abort this readcheck.
+            if (!allowReadCheck(elapsed)) { // FUNKY qdox bug does not allow me to inline this expression.
+                LOG.debug("Aborting read check.. Not enough time elapsed since last read check.");
+                return;
+            }
+
+            lastRunTime = now;
+            readCheck();
+        }
+    };
+
+    private boolean allowReadCheck(long elapsed) {
+        return elapsed > (readCheckTime * 9 / 10);
+    }
+
+
+    public MQTTInactivityMonitor(Transport next, WireFormat wireFormat) {
+        super(next);
+        this.wireFormat = wireFormat;
+    }
+
+    public void start() throws Exception {
+        next.start();
+        startMonitorThread();
+    }
+
+    public void stop() throws Exception {
+        stopMonitorThread();
+        next.stop();
+    }
+
+
+    final void readCheck() {
+        int currentCounter = next.getReceiveCounter();
+        int previousCounter = lastReceiveCounter.getAndSet(currentCounter);
+        if (inReceive.get() || currentCounter != previousCounter) {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("A receive is in progress");
+            }
+            return;
+        }
+        if (!commandReceived.get() && monitorStarted.get() && !ASYNC_TASKS.isTerminating()) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException.");
+            }
+            ASYNC_TASKS.execute(new Runnable() {
+                public void run() {
+                    onException(new InactivityIOException("Channel was inactive for too (>" + readCheckTime + ") long: " + next.getRemoteAddress()));
+                }
+
+                ;
+            });
+        } else {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Message received since last read check, resetting flag: ");
+            }
+        }
+        commandReceived.set(false);
+    }
+
+
+    public void onCommand(Object command) {
+        commandReceived.set(true);
+        inReceive.set(true);
+        try {
+            if (command.getClass() == KeepAliveInfo.class) {
+                KeepAliveInfo info = (KeepAliveInfo) command;
+                if (info.isResponseRequired()) {
+                    sendLock.readLock().lock();
+                    try {
+                        info.setResponseRequired(false);
+                        oneway(info);
+                    } catch (IOException e) {
+                        onException(e);
+                    } finally {
+                        sendLock.readLock().unlock();
+                    }
+                }
+            } else {
+                transportListener.onCommand(command);
+            }
+        } finally {
+            inReceive.set(false);
+        }
+    }
+
+    public void oneway(Object o) throws IOException {
+        // To prevent the inactivity monitor from sending a message while we
+        // are performing a send we take a read lock.  The inactivity monitor
+        // sends its Heart-beat commands under a write lock.  This means that
+        // the MutexTransport is still responsible for synchronizing sends
+        this.sendLock.readLock().lock();
+        inSend.set(true);
+        try {
+            doOnewaySend(o);
+        } finally {
+            commandSent.set(true);
+            inSend.set(false);
+            this.sendLock.readLock().unlock();
+        }
+    }
+
+    // Must be called under lock, either read or write on sendLock.
+    private void doOnewaySend(Object command) throws IOException {
+        if (failed.get()) {
+            throw new InactivityIOException("Cannot send, channel has already failed: " + next.getRemoteAddress());
+        }
+        next.oneway(command);
+    }
+
+    public void onException(IOException error) {
+        if (failed.compareAndSet(false, true)) {
+            stopMonitorThread();
+            transportListener.onException(error);
+        }
+    }
+
+    public void setUseKeepAlive(boolean val) {
+        useKeepAlive = val;
+    }
+
+    public long getReadCheckTime() {
+        return readCheckTime;
+    }
+
+    public void setReadCheckTime(long readCheckTime) {
+        this.readCheckTime = readCheckTime;
+    }
+
+
+    public long getInitialDelayTime() {
+        return initialDelayTime;
+    }
+
+    public void setInitialDelayTime(long initialDelayTime) {
+        this.initialDelayTime = initialDelayTime;
+    }
+
+    public boolean isKeepAliveResponseRequired() {
+        return this.keepAliveResponseRequired;
+    }
+
+    public void setKeepAliveResponseRequired(boolean value) {
+        this.keepAliveResponseRequired = value;
+    }
+
+    public boolean isMonitorStarted() {
+        return this.monitorStarted.get();
+    }
+
+    protected synchronized void startMonitorThread() throws IOException {
+        if (monitorStarted.get()) {
+            return;
+        }
+
+
+        if (readCheckTime > 0) {
+            readCheckerTask = new SchedulerTimerTask(readChecker);
+        }
+
+
+        if (readCheckTime > 0) {
+            monitorStarted.set(true);
+            synchronized (AbstractInactivityMonitor.class) {
+                if (CHECKER_COUNTER == 0) {
+                    ASYNC_TASKS = createExecutor();
+                    READ_CHECK_TIMER = new Timer("InactivityMonitor ReadCheck", true);
+                }
+                CHECKER_COUNTER++;
+                if (readCheckTime > 0) {
+                    READ_CHECK_TIMER.schedule(readCheckerTask, initialDelayTime, readCheckTime);
+                }
+            }
+        }
+    }
+
+
+    protected synchronized void stopMonitorThread() {
+        if (monitorStarted.compareAndSet(true, false)) {
+            if (readCheckerTask != null) {
+                readCheckerTask.cancel();
+            }
+
+            synchronized (AbstractInactivityMonitor.class) {
+                READ_CHECK_TIMER.purge();
+                CHECKER_COUNTER--;
+                if (CHECKER_COUNTER == 0) {
+                    READ_CHECK_TIMER.cancel();
+                    READ_CHECK_TIMER = null;
+                    ASYNC_TASKS.shutdown();
+                    ASYNC_TASKS = null;
+                }
+            }
+        }
+    }
+
+    private ThreadFactory factory = new ThreadFactory() {
+        public Thread newThread(Runnable runnable) {
+            Thread thread = new Thread(runnable, "MQTTInactivityMonitor Async Task: " + runnable);
+            thread.setDaemon(true);
+            return thread;
+        }
+    };
+
+    private ThreadPoolExecutor createExecutor() {
+        ThreadPoolExecutor exec = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory);
+        exec.allowCoreThreadTimeOut(true);
+        return exec;
+    }
+}
+

Copied: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java (from r1303689, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java?p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java&r1=1303689&r2=1304984&rev=1304984&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java Sun Mar 25 06:33:49 2012
@@ -14,42 +14,59 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.transport.stomp;
+package org.apache.activemq.transport.mqtt;
 
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
 import org.apache.activemq.broker.BrokerContext;
-import org.apache.activemq.broker.BrokerContextAware;
 import org.apache.activemq.command.*;
-import org.apache.activemq.util.ByteArrayOutputStream;
-import org.apache.activemq.util.*;
+import org.apache.activemq.transport.stomp.FrameTranslator;
+import org.apache.activemq.transport.stomp.LegacyFrameTranslator;
+import org.apache.activemq.transport.stomp.ProtocolException;
+import org.apache.activemq.transport.stomp.StompSubscription;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.FactoryFinder;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.IdGenerator;
+import org.apache.activemq.util.LRUCache;
+import org.apache.activemq.util.LongSequenceGenerator;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtbuf.UTF8Buffer;
+import org.fusesource.mqtt.codec.CONNACK;
+import org.fusesource.mqtt.codec.CONNECT;
+import org.fusesource.mqtt.codec.DISCONNECT;
+import org.fusesource.mqtt.codec.MQTTFrame;
+import org.fusesource.mqtt.codec.PINGREQ;
+import org.fusesource.mqtt.codec.PINGRESP;
+import org.fusesource.mqtt.codec.PUBLISH;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.jms.JMSException;
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * @author <a href="http://hiramchirino.com">chirino</a>
- */
-public class ProtocolConverter {
+class MQTTProtocolConverter {
 
-    private static final Logger LOG = LoggerFactory.getLogger(ProtocolConverter.class);
+    private static final Logger LOG = LoggerFactory.getLogger(MQTTProtocolConverter.class);
 
     private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
 
     private static final String BROKER_VERSION;
-    private static final StompFrame ping = new StompFrame(Stomp.Commands.KEEPALIVE);
+    private static final MQTTFrame PING_RESP_FRAME = new PINGRESP().encode();
 
     static {
         InputStream in = null;
         String version = "5.6.0";
-        if ((in = ProtocolConverter.class.getResourceAsStream("/org/apache/activemq/version.txt")) != null) {
+        if ((in = MQTTProtocolConverter.class.getResourceAsStream("/org/apache/activemq/version.txt")) != null) {
             BufferedReader reader = new BufferedReader(new InputStreamReader(in));
             try {
                 version = reader.readLine();
-            } catch(Exception e) {
+            } catch (Exception e) {
             }
         }
         BROKER_VERSION = version;
@@ -70,7 +87,9 @@ public class ProtocolConverter {
     private final ConcurrentHashMap<String, ActiveMQDestination> tempDestinations = new ConcurrentHashMap<String, ActiveMQDestination>();
     private final ConcurrentHashMap<String, String> tempDestinationAmqToStompMap = new ConcurrentHashMap<String, String>();
     private final Map<String, LocalTransactionId> transactions = new ConcurrentHashMap<String, LocalTransactionId>();
-    private final StompTransport stompTransport;
+    private final Map<UTF8Buffer, ActiveMQTopic> activeMQTopicMap = new LRUCache<UTF8Buffer, ActiveMQTopic>();
+    private final Map<Destination, UTF8Buffer> mqttTopicMap = new LRUCache<Destination, UTF8Buffer>();
+    private final MQTTTransport mqttTransport;
 
     private final Object commnadIdMutex = new Object();
     private int lastCommandId;
@@ -79,12 +98,12 @@ public class ProtocolConverter {
     private final FactoryFinder FRAME_TRANSLATOR_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/frametranslator/");
     private final BrokerContext brokerContext;
     private String version = "1.0";
-    private long hbReadInterval;
-    private long hbWriteInterval;
-    private String defaultHeartBeat = Stomp.DEFAULT_HEART_BEAT;
+    ConnectionInfo connectionInfo = new ConnectionInfo();
+    private CONNECT connect;
+    private String clientId;
 
-    public ProtocolConverter(StompTransport stompTransport, BrokerContext brokerContext) {
-        this.stompTransport = stompTransport;
+    public MQTTProtocolConverter(MQTTTransport mqttTransport, BrokerContext brokerContext) {
+        this.mqttTransport = mqttTransport;
         this.brokerContext = brokerContext;
     }
 
@@ -94,28 +113,6 @@ public class ProtocolConverter {
         }
     }
 
-    protected ResponseHandler createResponseHandler(final StompFrame command) {
-        final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
-        if (receiptId != null) {
-            return new ResponseHandler() {
-                public void onResponse(ProtocolConverter converter, Response response) throws IOException {
-                    if (response.isException()) {
-                        // Generally a command can fail.. but that does not invalidate the connection.
-                        // We report back the failure but we don't close the connection.
-                        Throwable exception = ((ExceptionResponse)response).getException();
-                        handleException(exception, command);
-                    } else {
-                        StompFrame sc = new StompFrame();
-                        sc.setAction(Stomp.Responses.RECEIPT);
-                        sc.setHeaders(new HashMap<String, String>(1));
-                        sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
-                        stompTransport.sendToStomp(sc);
-                    }
-                }
-            };
-        }
-        return null;
-    }
 
     protected void sendToActiveMQ(Command command, ResponseHandler handler) {
         command.setCommandId(generateCommandId());
@@ -123,495 +120,87 @@ public class ProtocolConverter {
             command.setResponseRequired(true);
             resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler);
         }
-        stompTransport.sendToActiveMQ(command);
-    }
-
-    protected void sendToStomp(StompFrame command) throws IOException {
-        stompTransport.sendToStomp(command);
+        mqttTransport.sendToActiveMQ(command);
     }
 
-    protected FrameTranslator findTranslator(String header) {
-        FrameTranslator translator = frameTranslator;
-        try {
-            if (header != null) {
-                translator = (FrameTranslator) FRAME_TRANSLATOR_FINDER
-                        .newInstance(header);
-                if (translator instanceof BrokerContextAware) {
-                    ((BrokerContextAware)translator).setBrokerContext(brokerContext);
-                }
-            }
-        } catch (Exception ignore) {
-            // if anything goes wrong use the default translator
-        }
-
-        return translator;
-    }
 
     /**
-     * Convert a stomp command
-     *
-     * @param command
+     * Convert a MQTT command
      */
-    public void onStompCommand(StompFrame command) throws IOException, JMSException {
-        try {
+    public void onMQTTCommand(MQTTFrame frame) throws IOException, JMSException {
 
-            if (command.getClass() == StompFrameError.class) {
-                throw ((StompFrameError)command).getException();
-            }
 
-            String action = command.getAction();
-            if (action.startsWith(Stomp.Commands.SEND)) {
-                onStompSend(command);
-            } else if (action.startsWith(Stomp.Commands.ACK)) {
-                onStompAck(command);
-            } else if (action.startsWith(Stomp.Commands.NACK)) {
-                onStompNack(command);
-            } else if (action.startsWith(Stomp.Commands.BEGIN)) {
-                onStompBegin(command);
-            } else if (action.startsWith(Stomp.Commands.COMMIT)) {
-                onStompCommit(command);
-            } else if (action.startsWith(Stomp.Commands.ABORT)) {
-                onStompAbort(command);
-            } else if (action.startsWith(Stomp.Commands.SUBSCRIBE)) {
-                onStompSubscribe(command);
-            } else if (action.startsWith(Stomp.Commands.UNSUBSCRIBE)) {
-                onStompUnsubscribe(command);
-            } else if (action.startsWith(Stomp.Commands.CONNECT) ||
-                       action.startsWith(Stomp.Commands.STOMP)) {
-                onStompConnect(command);
-            } else if (action.startsWith(Stomp.Commands.DISCONNECT)) {
-                onStompDisconnect(command);
-            } else {
-                throw new ProtocolException("Unknown STOMP action: " + action);
+        switch (frame.messageType()) {
+            case PINGREQ.TYPE: {
+                mqttTransport.sendToMQTT(PING_RESP_FRAME);
+                LOG.debug("Sent Ping Response to " + getClientId());
+                break;
             }
-
-        } catch (ProtocolException e) {
-            handleException(e, command);
-            // Some protocol errors can cause the connection to get closed.
-            if (e.isFatal()) {
-               getStompTransport().onException(e);
+            case CONNECT.TYPE: {
+                onMQTTConnect(new CONNECT().decode(frame));
+                break;
             }
-        }
-    }
-
-    protected void handleException(Throwable exception, StompFrame command) throws IOException {
-        LOG.warn("Exception occurred processing: \n" + command + ": " + exception.toString());
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Exception detail", exception);
-        }
-
-        // Let the stomp client know about any protocol errors.
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8"));
-        exception.printStackTrace(stream);
-        stream.close();
-
-        HashMap<String, String> headers = new HashMap<String, String>();
-        headers.put(Stomp.Headers.Error.MESSAGE, exception.getMessage());
-        headers.put(Stomp.Headers.CONTENT_TYPE, "text/plain");
-
-        if (command != null) {
-            final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
-            if (receiptId != null) {
-                headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
+            case DISCONNECT.TYPE: {
+                LOG.debug("MQTT Client " + getClientId() + " disconnecting");
+                stopTransport();
+                break;
             }
+            default:
+                handleException(new MQTTProtocolException("Unknown MQTTFrame type: " + frame.messageType(), true), frame);
         }
 
-        StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers, baos.toByteArray());
-        sendToStomp(errorMessage);
     }
 
-    protected void onStompSend(StompFrame command) throws IOException, JMSException {
-        checkConnected();
-
-        Map<String, String> headers = command.getHeaders();
-        String destination = headers.get(Stomp.Headers.Send.DESTINATION);
-        if (destination == null) {
-            throw new ProtocolException("SEND received without a Destination specified!");
-        }
-
-        String stompTx = headers.get(Stomp.Headers.TRANSACTION);
-        headers.remove("transaction");
-
-        ActiveMQMessage message = convertMessage(command);
-
-        message.setProducerId(producerId);
-        MessageId id = new MessageId(producerId, messageIdGenerator.getNextSequenceId());
-        message.setMessageId(id);
-        message.setJMSTimestamp(System.currentTimeMillis());
 
-        if (stompTx != null) {
-            TransactionId activemqTx = transactions.get(stompTx);
-            if (activemqTx == null) {
-                throw new ProtocolException("Invalid transaction id: " + stompTx);
-            }
-            message.setTransactionId(activemqTx);
-        }
-
-        message.onSend();
-        sendToActiveMQ(message, createResponseHandler(command));
-    }
-
-    protected void onStompNack(StompFrame command) throws ProtocolException {
-
-        checkConnected();
-
-        if (this.version.equals(Stomp.V1_0)) {
-            throw new ProtocolException("NACK received but connection is in v1.0 mode.");
-        }
-
-        Map<String, String> headers = command.getHeaders();
-
-        String subscriptionId = headers.get(Stomp.Headers.Ack.SUBSCRIPTION);
-        if (subscriptionId == null) {
-            throw new ProtocolException("NACK received without a subscription id for acknowledge!");
-        }
-
-        String messageId = headers.get(Stomp.Headers.Ack.MESSAGE_ID);
-        if (messageId == null) {
-            throw new ProtocolException("NACK received without a message-id to acknowledge!");
-        }
-
-        TransactionId activemqTx = null;
-        String stompTx = headers.get(Stomp.Headers.TRANSACTION);
-        if (stompTx != null) {
-            activemqTx = transactions.get(stompTx);
-            if (activemqTx == null) {
-                throw new ProtocolException("Invalid transaction id: " + stompTx);
-            }
-        }
-
-        if (subscriptionId != null) {
-            StompSubscription sub = this.subscriptions.get(subscriptionId);
-            if (sub != null) {
-                MessageAck ack = sub.onStompMessageNack(messageId, activemqTx);
-                if (ack != null) {
-                    sendToActiveMQ(ack, createResponseHandler(command));
-                } else {
-                    throw new ProtocolException("Unexpected NACK received for message-id [" + messageId + "]");
-                }
-            }
-        }
-    }
-
-    protected void onStompAck(StompFrame command) throws ProtocolException {
-        checkConnected();
-
-        Map<String, String> headers = command.getHeaders();
-        String messageId = headers.get(Stomp.Headers.Ack.MESSAGE_ID);
-        if (messageId == null) {
-            throw new ProtocolException("ACK received without a message-id to acknowledge!");
-        }
-
-        String subscriptionId = headers.get(Stomp.Headers.Ack.SUBSCRIPTION);
-        if (this.version.equals(Stomp.V1_1) && subscriptionId == null) {
-            throw new ProtocolException("ACK received without a subscription id for acknowledge!");
-        }
-
-        TransactionId activemqTx = null;
-        String stompTx = headers.get(Stomp.Headers.TRANSACTION);
-        if (stompTx != null) {
-            activemqTx = transactions.get(stompTx);
-            if (activemqTx == null) {
-                throw new ProtocolException("Invalid transaction id: " + stompTx);
-            }
-        }
-
-        boolean acked = false;
-
-        if (subscriptionId != null) {
-
-            StompSubscription sub = this.subscriptions.get(subscriptionId);
-            if (sub != null) {
-                MessageAck ack = sub.onStompMessageAck(messageId, activemqTx);
-                if (ack != null) {
-                    sendToActiveMQ(ack, createResponseHandler(command));
-                    acked = true;
-                }
-            }
-
-        } else {
-
-            // TODO: acking with just a message id is very bogus since the same message id
-            // could have been sent to 2 different subscriptions on the same Stomp connection.
-            // For example, when 2 subs are created on the same topic.
-
-            for (StompSubscription sub : subscriptionsByConsumerId.values()) {
-                MessageAck ack = sub.onStompMessageAck(messageId, activemqTx);
-                if (ack != null) {
-                    sendToActiveMQ(ack, createResponseHandler(command));
-                    acked = true;
-                    break;
-                }
-            }
-        }
-
-        if (!acked) {
-            throw new ProtocolException("Unexpected ACK received for message-id [" + messageId + "]");
-        }
-    }
-
-    protected void onStompBegin(StompFrame command) throws ProtocolException {
-        checkConnected();
-
-        Map<String, String> headers = command.getHeaders();
-
-        String stompTx = headers.get(Stomp.Headers.TRANSACTION);
-
-        if (!headers.containsKey(Stomp.Headers.TRANSACTION)) {
-            throw new ProtocolException("Must specify the transaction you are beginning");
-        }
-
-        if (transactions.get(stompTx) != null) {
-            throw new ProtocolException("The transaction was allready started: " + stompTx);
-        }
-
-        LocalTransactionId activemqTx = new LocalTransactionId(connectionId, transactionIdGenerator.getNextSequenceId());
-        transactions.put(stompTx, activemqTx);
-
-        TransactionInfo tx = new TransactionInfo();
-        tx.setConnectionId(connectionId);
-        tx.setTransactionId(activemqTx);
-        tx.setType(TransactionInfo.BEGIN);
-
-        sendToActiveMQ(tx, createResponseHandler(command));
-    }
-
-    protected void onStompCommit(StompFrame command) throws ProtocolException {
-        checkConnected();
-
-        Map<String, String> headers = command.getHeaders();
-
-        String stompTx = headers.get(Stomp.Headers.TRANSACTION);
-        if (stompTx == null) {
-            throw new ProtocolException("Must specify the transaction you are committing");
-        }
-
-        TransactionId activemqTx = transactions.remove(stompTx);
-        if (activemqTx == null) {
-            throw new ProtocolException("Invalid transaction id: " + stompTx);
-        }
-
-        for (StompSubscription sub : subscriptionsByConsumerId.values()) {
-            sub.onStompCommit(activemqTx);
-        }
-
-        TransactionInfo tx = new TransactionInfo();
-        tx.setConnectionId(connectionId);
-        tx.setTransactionId(activemqTx);
-        tx.setType(TransactionInfo.COMMIT_ONE_PHASE);
-
-        sendToActiveMQ(tx, createResponseHandler(command));
-    }
+    protected void onMQTTConnect(final CONNECT connect) throws ProtocolException {
 
-    protected void onStompAbort(StompFrame command) throws ProtocolException {
-        checkConnected();
-        Map<String, String> headers = command.getHeaders();
-
-        String stompTx = headers.get(Stomp.Headers.TRANSACTION);
-        if (stompTx == null) {
-            throw new ProtocolException("Must specify the transaction you are committing");
-        }
-
-        TransactionId activemqTx = transactions.remove(stompTx);
-        if (activemqTx == null) {
-            throw new ProtocolException("Invalid transaction id: " + stompTx);
-        }
-        for (StompSubscription sub : subscriptionsByConsumerId.values()) {
-            try {
-                sub.onStompAbort(activemqTx);
-            } catch (Exception e) {
-                throw new ProtocolException("Transaction abort failed", false, e);
-            }
-        }
-
-        TransactionInfo tx = new TransactionInfo();
-        tx.setConnectionId(connectionId);
-        tx.setTransactionId(activemqTx);
-        tx.setType(TransactionInfo.ROLLBACK);
-
-        sendToActiveMQ(tx, createResponseHandler(command));
-    }
-
-    protected void onStompSubscribe(StompFrame command) throws ProtocolException {
-        checkConnected();
-        FrameTranslator translator = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION));
-        Map<String, String> headers = command.getHeaders();
-
-        String subscriptionId = headers.get(Stomp.Headers.Subscribe.ID);
-        String destination = headers.get(Stomp.Headers.Subscribe.DESTINATION);
-
-        if (this.version.equals(Stomp.V1_1) && subscriptionId == null) {
-            throw new ProtocolException("SUBSCRIBE received without a subscription id!");
-        }
-
-        ActiveMQDestination actualDest = translator.convertDestination(this, destination, true);
-
-        if (actualDest == null) {
-            throw new ProtocolException("Invalid Destination.");
-        }
-
-        ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
-        ConsumerInfo consumerInfo = new ConsumerInfo(id);
-        consumerInfo.setPrefetchSize(1000);
-        consumerInfo.setDispatchAsync(true);
-
-        String browser = headers.get(Stomp.Headers.Subscribe.BROWSER);
-        if (browser != null && browser.equals(Stomp.TRUE)) {
-
-            if (!this.version.equals(Stomp.V1_1)) {
-                throw new ProtocolException("Queue Browser feature only valid for Stomp v1.1 clients!");
-            }
-
-            consumerInfo.setBrowser(true);
-        }
-
-        String selector = headers.remove(Stomp.Headers.Subscribe.SELECTOR);
-        consumerInfo.setSelector(selector);
-
-        IntrospectionSupport.setProperties(consumerInfo, headers, "activemq.");
-
-        consumerInfo.setDestination(translator.convertDestination(this, destination, true));
-
-        StompSubscription stompSubscription;
-        if (!consumerInfo.isBrowser()) {
-            stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION));
-        } else {
-            stompSubscription = new StompQueueBrowserSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION));
-        }
-        stompSubscription.setDestination(actualDest);
-
-        String ackMode = headers.get(Stomp.Headers.Subscribe.ACK_MODE);
-        if (Stomp.Headers.Subscribe.AckModeValues.CLIENT.equals(ackMode)) {
-            stompSubscription.setAckMode(StompSubscription.CLIENT_ACK);
-        } else if (Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL.equals(ackMode)) {
-            stompSubscription.setAckMode(StompSubscription.INDIVIDUAL_ACK);
-        } else {
-            stompSubscription.setAckMode(StompSubscription.AUTO_ACK);
-        }
-
-        subscriptionsByConsumerId.put(id, stompSubscription);
-        // Stomp v1.0 doesn't need to set this header so we avoid an NPE if not set.
-        if (subscriptionId != null) {
-            subscriptions.put(subscriptionId, stompSubscription);
-        }
-
-        // dispatch can beat the receipt so send it early
-        sendReceipt(command);
-        sendToActiveMQ(consumerInfo, null);
-    }
-
-    protected void onStompUnsubscribe(StompFrame command) throws ProtocolException {
-        checkConnected();
-        Map<String, String> headers = command.getHeaders();
-
-        ActiveMQDestination destination = null;
-        Object o = headers.get(Stomp.Headers.Unsubscribe.DESTINATION);
-        if (o != null) {
-            destination = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)).convertDestination(this, (String)o, true);
-        }
-
-        String subscriptionId = headers.get(Stomp.Headers.Unsubscribe.ID);
-        if (this.version.equals(Stomp.V1_1) && subscriptionId == null) {
-            throw new ProtocolException("UNSUBSCRIBE received without a subscription id!");
+        if (connected.get()) {
+            throw new ProtocolException("All ready connected.");
         }
+        this.connect = connect;
 
-        if (subscriptionId == null && destination == null) {
-            throw new ProtocolException("Must specify the subscriptionId or the destination you are unsubscribing from");
+        String clientId = "";
+        if (connect.clientId() != null) {
+            clientId = connect.clientId().toString();
         }
 
-        // check if it is a durable subscription
-        String durable = command.getHeaders().get("activemq.subscriptionName");
-        if (durable != null) {
-            RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
-            info.setClientId(durable);
-            info.setSubscriptionName(durable);
-            info.setConnectionId(connectionId);
-            sendToActiveMQ(info, createResponseHandler(command));
-            return;
+        String userName = "";
+        if (connect.userName() != null) {
+            userName = connect.userName().toString();
         }
+        String passswd = "";
+        if (connect.password() != null) {
+            passswd = connect.password().toString();
 
-        if (subscriptionId != null) {
-
-            StompSubscription sub = this.subscriptions.remove(subscriptionId);
-            if (sub != null) {
-                sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command));
-                return;
-            }
-
-        } else {
-
-            // Unsubscribing using a destination is a bit weird if multiple subscriptions
-            // are created with the same destination.
-            for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
-                StompSubscription sub = iter.next();
-                if (destination != null && destination.equals(sub.getDestination())) {
-                    sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command));
-                    iter.remove();
-                    return;
-                }
-            }
-        }
-
-        throw new ProtocolException("No subscription matched.");
-    }
-
-    ConnectionInfo connectionInfo = new ConnectionInfo();
-
-    protected void onStompConnect(final StompFrame command) throws ProtocolException {
-
-        if (connected.get()) {
-            throw new ProtocolException("Allready connected.");
         }
 
-        final Map<String, String> headers = command.getHeaders();
-
-        // allow anyone to login for now
-        String login = headers.get(Stomp.Headers.Connect.LOGIN);
-        String passcode = headers.get(Stomp.Headers.Connect.PASSCODE);
-        String clientId = headers.get(Stomp.Headers.Connect.CLIENT_ID);
-        String heartBeat = headers.get(Stomp.Headers.Connect.HEART_BEAT);
-        String accepts = headers.get(Stomp.Headers.Connect.ACCEPT_VERSION);
-
-        if (accepts == null) {
-            accepts = Stomp.DEFAULT_VERSION;
-        }
-        if (heartBeat == null) {
-            heartBeat = defaultHeartBeat;
-        }
 
-        HashSet<String> acceptsVersions = new HashSet<String>(Arrays.asList(accepts.split(Stomp.COMMA)));
-        acceptsVersions.retainAll(Arrays.asList(Stomp.SUPPORTED_PROTOCOL_VERSIONS));
-        if (acceptsVersions.isEmpty()) {
-            throw new ProtocolException("Invalid Protocol version[" + accepts +"], supported versions are: " +
-                                        Arrays.toString(Stomp.SUPPORTED_PROTOCOL_VERSIONS), true);
-        } else {
-            this.version = Collections.max(acceptsVersions);
-        }
+        configureInactivityMonitor(connect.keepAlive());
 
-        configureInactivityMonitor(heartBeat);
 
-        IntrospectionSupport.setProperties(connectionInfo, headers, "activemq.");
         connectionInfo.setConnectionId(connectionId);
-        if (clientId != null) {
+        if (clientId != null && clientId.isEmpty() == false) {
             connectionInfo.setClientId(clientId);
         } else {
             connectionInfo.setClientId("" + connectionInfo.getConnectionId().toString());
         }
 
         connectionInfo.setResponseRequired(true);
-        connectionInfo.setUserName(login);
-        connectionInfo.setPassword(passcode);
-        connectionInfo.setTransportContext(stompTransport.getPeerCertificates());
+        connectionInfo.setUserName(userName);
+        connectionInfo.setPassword(passswd);
+        connectionInfo.setTransportContext(mqttTransport.getPeerCertificates());
 
         sendToActiveMQ(connectionInfo, new ResponseHandler() {
-            public void onResponse(ProtocolConverter converter, Response response) throws IOException {
+            public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
 
                 if (response.isException()) {
                     // If the connection attempt fails we close the socket.
-                    Throwable exception = ((ExceptionResponse)response).getException();
-                    handleException(exception, command);
-                    getStompTransport().onException(IOExceptionSupport.create(exception));
+                    Throwable exception = ((ExceptionResponse) response).getException();
+                    //let the client know
+                    CONNACK ack = new CONNACK();
+                    ack.code(CONNACK.Code.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
+                    getMQTTTransport().sendToMQTT(ack.encode());
+                    getMQTTTransport().onException(IOExceptionSupport.create(exception));
                     return;
                 }
 
@@ -620,46 +209,21 @@ public class ProtocolConverter {
 
                 final ProducerInfo producerInfo = new ProducerInfo(producerId);
                 sendToActiveMQ(producerInfo, new ResponseHandler() {
-                    public void onResponse(ProtocolConverter converter, Response response) throws IOException {
+                    public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
 
                         if (response.isException()) {
                             // If the connection attempt fails we close the socket.
-                            Throwable exception = ((ExceptionResponse)response).getException();
-                            handleException(exception, command);
-                            getStompTransport().onException(IOExceptionSupport.create(exception));
+                            Throwable exception = ((ExceptionResponse) response).getException();
+                            CONNACK ack = new CONNACK();
+                            ack.code(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD);
+                            getMQTTTransport().sendToMQTT(ack.encode());
+                            getMQTTTransport().onException(IOExceptionSupport.create(exception));
                         }
 
-                        connected.set(true);
-                        HashMap<String, String> responseHeaders = new HashMap<String, String>();
-
-                        responseHeaders.put(Stomp.Headers.Connected.SESSION, connectionInfo.getClientId());
-                        String requestId = headers.get(Stomp.Headers.Connect.REQUEST_ID);
-                        if (requestId == null) {
-                            // TODO legacy
-                            requestId = headers.get(Stomp.Headers.RECEIPT_REQUESTED);
-                        }
-                        if (requestId != null) {
-                            // TODO legacy
-                            responseHeaders.put(Stomp.Headers.Connected.RESPONSE_ID, requestId);
-                            responseHeaders.put(Stomp.Headers.Response.RECEIPT_ID, requestId);
-                        }
+                        CONNACK ack = new CONNACK();
+                        ack.code(CONNACK.Code.CONNECTION_ACCEPTED);
+                        getMQTTTransport().sendToMQTT(ack.encode());
 
-                        responseHeaders.put(Stomp.Headers.Connected.VERSION, version);
-                        responseHeaders.put(Stomp.Headers.Connected.HEART_BEAT,
-                                            String.format("%d,%d", hbWriteInterval, hbReadInterval));
-                        responseHeaders.put(Stomp.Headers.Connected.SERVER, "ActiveMQ/"+BROKER_VERSION);
-
-                        StompFrame sc = new StompFrame();
-                        sc.setAction(Stomp.Responses.CONNECTED);
-                        sc.setHeaders(responseHeaders);
-                        sendToStomp(sc);
-
-                        if (version.equals(Stomp.V1_1)) {
-                            StompWireFormat format = stompTransport.getWireFormat();
-                            if (format != null) {
-                                format.setEncodingEnabled(true);
-                            }
-                        }
                     }
                 });
 
@@ -667,73 +231,122 @@ public class ProtocolConverter {
         });
     }
 
-    protected void onStompDisconnect(StompFrame command) throws ProtocolException {
-        checkConnected();
-        sendToActiveMQ(connectionInfo.createRemoveCommand(), createResponseHandler(command));
-        sendToActiveMQ(new ShutdownInfo(), createResponseHandler(command));
-        connected.set(false);
-    }
-
-    protected void checkConnected() throws ProtocolException {
-        if (!connected.get()) {
-            throw new ProtocolException("Not connected.");
-        }
-    }
 
     /**
      * Dispatch a ActiveMQ command
-     *
-     * @param command
-     * @throws IOException
      */
+
+
     public void onActiveMQCommand(Command command) throws IOException, JMSException {
         if (command.isResponse()) {
-            Response response = (Response)command;
+            Response response = (Response) command;
             ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId()));
             if (rh != null) {
                 rh.onResponse(this, response);
             } else {
                 // Pass down any unexpected errors. Should this close the connection?
                 if (response.isException()) {
-                    Throwable exception = ((ExceptionResponse)response).getException();
+                    Throwable exception = ((ExceptionResponse) response).getException();
                     handleException(exception, null);
                 }
             }
         } else if (command.isMessageDispatch()) {
-            MessageDispatch md = (MessageDispatch)command;
+            MessageDispatch md = (MessageDispatch) command;
             StompSubscription sub = subscriptionsByConsumerId.get(md.getConsumerId());
             if (sub != null) {
-                sub.onMessageDispatch(md);
+                //sub.onMessageDispatch(md);
             }
-        } else if (command.getDataStructureType() == CommandTypes.KEEP_ALIVE_INFO) {
-            stompTransport.sendToStomp(ping);
         } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) {
             // Pass down any unexpected async errors. Should this close the connection?
-            Throwable exception = ((ConnectionError)command).getException();
+            Throwable exception = ((ConnectionError) command).getException();
             handleException(exception, null);
+        } else {
+            LOG.debug("Do not know how to process ActiveMQ Command " + command);
         }
     }
 
-    public ActiveMQMessage convertMessage(StompFrame command) throws IOException, JMSException {
-        ActiveMQMessage msg = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)).convertFrame(this, command);
+
+    public ActiveMQMessage convertMessage(PUBLISH command) throws IOException, JMSException {
+        ActiveMQBytesMessage msg = new ActiveMQBytesMessage();
+        StringBuilder msgId = new StringBuilder();
+        msgId.append("ID:").append(getClientId()).append(":").append(command.messageId());
+        msg.setJMSMessageID(msgId.toString());
+        msg.setJMSPriority(4);
+
+        //ActiveMQTopic topic = new ActiveMQTopic(topicName);
+        ActiveMQTopic topic = null;
+        synchronized (activeMQTopicMap) {
+            topic = activeMQTopicMap.get(command.topicName());
+            if (topic == null) {
+                String topicName = command.topicName().toString().replaceAll("/", ".");
+                topic = new ActiveMQTopic(topicName);
+                activeMQTopicMap.put(command.topicName(), topic);
+            }
+        }
+        msg.setJMSDestination(topic);
+        msg.writeBytes(command.payload().data, command.payload().offset, command.payload().length);
         return msg;
     }
 
-    public StompFrame convertMessage(ActiveMQMessage message, boolean ignoreTransformation) throws IOException, JMSException {
-        if (ignoreTransformation == true) {
-            return frameTranslator.convertMessage(this, message);
+    public MQTTFrame convertMessage(ActiveMQMessage message) throws IOException, JMSException {
+        PUBLISH result = new PUBLISH();
+        String msgId = message.getJMSMessageID();
+        int offset = msgId.lastIndexOf(':');
+
+        short id = 0;
+        if (offset > 0) {
+            Short.parseShort(msgId.substring(offset, msgId.length() - 1));
+        }
+        result.messageId(id);
+
+        UTF8Buffer topicName = null;
+        synchronized (mqttTopicMap) {
+            topicName = mqttTopicMap.get(message.getJMSDestination());
+            if (topicName == null) {
+                topicName = new UTF8Buffer(message.getDestination().getPhysicalName().replaceAll(".", "/"));
+                mqttTopicMap.put(message.getJMSDestination(), topicName);
+            }
+        }
+        result.topicName(topicName);
+
+        if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) {
+
+            if (!message.isCompressed() && message.getContent() != null) {
+                ByteSequence msgContent = message.getContent();
+                if (msgContent.getLength() > 4) {
+                    byte[] content = new byte[msgContent.getLength() - 4];
+                    System.arraycopy(msgContent.data, 4, content, 0, content.length);
+                    result.payload(new Buffer(content));
+                }
+            } else {
+                ActiveMQTextMessage msg = (ActiveMQTextMessage) message.copy();
+                String messageText = msg.getText();
+                if (messageText != null) {
+                    result.payload(new Buffer(msg.getText().getBytes("UTF-8")));
+                }
+            }
+
+        } else if (message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE) {
+
+            ActiveMQBytesMessage msg = (ActiveMQBytesMessage) message.copy();
+            msg.setReadOnlyBody(true);
+            byte[] data = new byte[(int) msg.getBodyLength()];
+            msg.readBytes(data);
+            result.payload(new Buffer(data));
         } else {
-            return findTranslator(message.getStringProperty(Stomp.Headers.TRANSFORMATION)).convertMessage(this, message);
+            LOG.debug("Cannot convert " + message + " to a MQTT PUBLISH");
         }
+        return result.encode();
     }
 
-    public StompTransport getStompTransport() {
-        return stompTransport;
+
+    public MQTTTransport getMQTTTransport() {
+        return mqttTransport;
     }
 
     public ActiveMQDestination createTempDestination(String name, boolean topic) {
         ActiveMQDestination rc = tempDestinations.get(name);
-        if( rc == null ) {
+        if (rc == null) {
             if (topic) {
                 rc = new ActiveMQTempTopic(connectionId, tempDestinationGenerator.getNextSequenceId());
             } else {
@@ -750,62 +363,55 @@ public class ProtocolConverter {
         return tempDestinationAmqToStompMap.get(destination.getQualifiedName());
     }
 
-    public String getDefaultHeartBeat() {
-        return defaultHeartBeat;
-    }
 
-    public void setDefaultHeartBeat(String defaultHeartBeat) {
-        this.defaultHeartBeat = defaultHeartBeat;
-    }
+    protected void configureInactivityMonitor(short heartBeat) throws ProtocolException {
+        try {
 
-    protected void configureInactivityMonitor(String heartBeatConfig) throws ProtocolException {
+            int heartBeatMS = heartBeat * 1000;
+            MQTTInactivityMonitor monitor = getMQTTTransport().getInactivityMonitor();
 
-        String[] keepAliveOpts = heartBeatConfig.split(Stomp.COMMA);
+            monitor.setReadCheckTime(heartBeatMS);
+            monitor.setInitialDelayTime(heartBeatMS);
 
-        if (keepAliveOpts == null || keepAliveOpts.length != 2) {
-            throw new ProtocolException("Invalid heart-beat header:" + heartBeatConfig, true);
-        } else {
+            monitor.startMonitorThread();
 
-            try {
-                hbReadInterval = Long.parseLong(keepAliveOpts[0]);
-                hbWriteInterval = Long.parseLong(keepAliveOpts[1]);
-            } catch(NumberFormatException e) {
-                throw new ProtocolException("Invalid heart-beat header:" + heartBeatConfig, true);
-            }
+        } catch (Exception ex) {
 
-            try {
+        }
 
-                StompInactivityMonitor monitor = this.stompTransport.getInactivityMonitor();
+        LOG.debug(getClientId() + " MQTT Connection using heart beat of  " + heartBeat + " secs");
+    }
 
-                monitor.setReadCheckTime(hbReadInterval);
-                monitor.setInitialDelayTime(Math.min(hbReadInterval, hbWriteInterval));
-                monitor.setWriteCheckTime(hbWriteInterval);
 
-                monitor.startMonitoring();
+    protected void handleException(Throwable exception, MQTTFrame command) throws IOException {
+        LOG.warn("Exception occurred processing: \n" + command + ": " + exception.toString());
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Exception detail", exception);
+        }
 
-            } catch(Exception ex) {
-                hbReadInterval = 0;
-                hbWriteInterval = 0;
-            }
+        try {
+            getMQTTTransport().stop();
+        } catch (Throwable e) {
+            LOG.error("Failed to stop MQTTT Transport ", e);
+        }
+    }
 
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Stomp Connect heartbeat conf RW[" + hbReadInterval + "," + hbWriteInterval + "]");
+    private String getClientId() {
+        if (clientId == null) {
+            if (connect != null && connect.clientId() != null) {
+                clientId = connect.clientId().toString();
             }
+        } else {
+            clientId = "";
         }
+        return clientId;
     }
 
-    protected void sendReceipt(StompFrame command) {
-        final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
-        if (receiptId != null) {
-            StompFrame sc = new StompFrame();
-            sc.setAction(Stomp.Responses.RECEIPT);
-            sc.setHeaders(new HashMap<String, String>(1));
-            sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
-            try {
-                sendToStomp(sc);
-            } catch (IOException e) {
-                LOG.warn("Could not send a receipt for " + command, e);
-            }
+    private void stopTransport() {
+        try {
+            getMQTTTransport().stop();
+        } catch (Throwable e) {
+            LOG.debug("Failed to stop MQTT transport ", e);
         }
     }
 }

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolException.java (from r1303689, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolException.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolException.java?p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolException.java&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolException.java&r1=1303689&r2=1304984&rev=1304984&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolException.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolException.java Sun Mar 25 06:33:49 2012
@@ -14,32 +14,30 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.transport.stomp;
+package org.apache.activemq.transport.mqtt;
 
 import java.io.IOException;
 
-/**
- * @author <a href="http://hiramchirino.com">chirino</a>
- */
-public class ProtocolException extends IOException {
+
+public class MQTTProtocolException extends IOException {
 
     private static final long serialVersionUID = -2869735532997332242L;
 
     private final boolean fatal;
 
-    public ProtocolException() {
+    public MQTTProtocolException() {
         this(null);
     }
 
-    public ProtocolException(String s) {
+    public MQTTProtocolException(String s) {
         this(s, false);
     }
 
-    public ProtocolException(String s, boolean fatal) {
+    public MQTTProtocolException(String s, boolean fatal) {
         this(s, fatal, null);
     }
 
-    public ProtocolException(String s, boolean fatal, Throwable cause) {
+    public MQTTProtocolException(String s, boolean fatal, Throwable cause) {
         super(s);
         this.fatal = fatal;
         initCause(cause);

Copied: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java (from r1303689, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java?p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java&r1=1303689&r2=1304984&rev=1304984&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java Sun Mar 25 06:33:49 2012
@@ -14,49 +14,47 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.transport.stomp;
+package org.apache.activemq.transport.mqtt;
 
-import org.apache.activemq.command.*;
-
-import javax.jms.JMSException;
 import java.io.IOException;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
-import java.util.Map;
-import java.util.Map.Entry;
+
+import javax.jms.JMSException;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageId;
+import org.fusesource.mqtt.codec.MQTTFrame;
 
 /**
  * Keeps track of the STOMP subscription so that acking is correctly done.
- *
- * @author <a href="http://hiramchirino.com">chirino</a>
  */
-public class StompSubscription {
+public class MQTTSubscription {
 
-    public static final String AUTO_ACK = Stomp.Headers.Subscribe.AckModeValues.AUTO;
-    public static final String CLIENT_ACK = Stomp.Headers.Subscribe.AckModeValues.CLIENT;
-    public static final String INDIVIDUAL_ACK = Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL;
 
-    protected final ProtocolConverter protocolConverter;
+    protected final MQTTProtocolConverter protocolConverter;
     protected final String subscriptionId;
     protected final ConsumerInfo consumerInfo;
 
     protected final LinkedHashMap<MessageId, MessageDispatch> dispatchedMessage = new LinkedHashMap<MessageId, MessageDispatch>();
     protected final LinkedList<MessageDispatch> unconsumedMessage = new LinkedList<MessageDispatch>();
 
-    protected String ackMode = AUTO_ACK;
     protected ActiveMQDestination destination;
     protected String transformation;
 
-    public StompSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo, String transformation) {
-        this.protocolConverter = stompTransport;
+    public MQTTSubscription(MQTTProtocolConverter protocolConverter, String subscriptionId, ConsumerInfo consumerInfo, String transformation) {
+        this.protocolConverter = protocolConverter;
         this.subscriptionId = subscriptionId;
         this.consumerInfo = consumerInfo;
         this.transformation = transformation;
     }
 
     void onMessageDispatch(MessageDispatch md) throws IOException, JMSException {
-        ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
+        ActiveMQMessage message = (ActiveMQMessage) md.getMessage();
+        /*
         if (ackMode == CLIENT_ACK) {
             synchronized (this) {
                 dispatchedMessage.put(message.getMessageId(), md);
@@ -69,136 +67,14 @@ public class StompSubscription {
             MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
             protocolConverter.getStompTransport().sendToActiveMQ(ack);
         }
+        */
+        MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
+        protocolConverter.getMQTTTransport().sendToActiveMQ(ack);
 
-        boolean ignoreTransformation = false;
-
-        if (transformation != null && !( message instanceof ActiveMQBytesMessage ) ) {
-               message.setReadOnlyProperties(false);
-            message.setStringProperty(Stomp.Headers.TRANSFORMATION, transformation);
-        } else {
-            if (message.getStringProperty(Stomp.Headers.TRANSFORMATION) != null) {
-                ignoreTransformation = true;
-            }
-        }
-
-        StompFrame command = protocolConverter.convertMessage(message, ignoreTransformation);
-
-        command.setAction(Stomp.Responses.MESSAGE);
-        if (subscriptionId != null) {
-            command.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION, subscriptionId);
-        }
-
-        protocolConverter.getStompTransport().sendToStomp(command);
-    }
-
-    synchronized void onStompAbort(TransactionId transactionId) {
-        unconsumedMessage.clear();
-    }
-
-    synchronized void onStompCommit(TransactionId transactionId) {
-        for (Iterator<?> iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
-            @SuppressWarnings("rawtypes")
-            Map.Entry entry = (Entry)iter.next();
-            MessageDispatch msg = (MessageDispatch)entry.getValue();
-            if (unconsumedMessage.contains(msg)) {
-                iter.remove();
-            }
-        }
-
-        if (!unconsumedMessage.isEmpty()) {
-            MessageAck ack = new MessageAck(unconsumedMessage.getLast(), MessageAck.STANDARD_ACK_TYPE, unconsumedMessage.size());
-            protocolConverter.getStompTransport().sendToActiveMQ(ack);
-            unconsumedMessage.clear();
-        }
+        MQTTFrame command = protocolConverter.convertMessage(message);
+        protocolConverter.getMQTTTransport().sendToMQTT(command);
     }
 
-    synchronized MessageAck onStompMessageAck(String messageId, TransactionId transactionId) {
-
-        MessageId msgId = new MessageId(messageId);
-
-        if (!dispatchedMessage.containsKey(msgId)) {
-            return null;
-        }
-
-        MessageAck ack = new MessageAck();
-        ack.setDestination(consumerInfo.getDestination());
-        ack.setConsumerId(consumerInfo.getConsumerId());
-
-        if (ackMode == CLIENT_ACK) {
-            if (transactionId == null) {
-                ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
-            } else {
-                ack.setAckType(MessageAck.DELIVERED_ACK_TYPE);
-            }
-            int count = 0;
-            for (Iterator<?> iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
-
-                @SuppressWarnings("rawtypes")
-                Map.Entry entry = (Entry)iter.next();
-                MessageId id = (MessageId)entry.getKey();
-                MessageDispatch msg = (MessageDispatch)entry.getValue();
-
-                if (transactionId != null) {
-                    if (!unconsumedMessage.contains(msg)) {
-                        unconsumedMessage.add(msg);
-                        count++;
-                    }
-                } else {
-                    iter.remove();
-                    count++;
-                }
-
-                if (id.equals(msgId)) {
-                    ack.setLastMessageId(id);
-                    break;
-                }
-            }
-            ack.setMessageCount(count);
-            if (transactionId != null) {
-                ack.setTransactionId(transactionId);
-            }
-
-        } else if (ackMode == INDIVIDUAL_ACK) {
-            ack.setAckType(MessageAck.INDIVIDUAL_ACK_TYPE);
-            ack.setMessageID(msgId);
-            if (transactionId != null) {
-                unconsumedMessage.add(dispatchedMessage.get(msgId));
-                ack.setTransactionId(transactionId);
-            }
-            dispatchedMessage.remove(msgId);
-        }
-        return ack;
-    }
-
-    public MessageAck onStompMessageNack(String messageId, TransactionId transactionId) throws ProtocolException {
-
-        MessageId msgId = new MessageId(messageId);
-
-        if (!dispatchedMessage.containsKey(msgId)) {
-            return null;
-        }
-
-        MessageAck ack = new MessageAck();
-        ack.setDestination(consumerInfo.getDestination());
-        ack.setConsumerId(consumerInfo.getConsumerId());
-        ack.setAckType(MessageAck.POSION_ACK_TYPE);
-        ack.setMessageID(msgId);
-        if (transactionId != null) {
-            unconsumedMessage.add(dispatchedMessage.get(msgId));
-            ack.setTransactionId(transactionId);
-        }
-        dispatchedMessage.remove(msgId);
-
-        return ack;
-    }
-
-    public String getAckMode() {
-        return ackMode;
-    }
-
-    public void setAckMode(String ackMode) {
-        this.ackMode = ackMode;
-    }
 
     public String getSubscriptionId() {
         return subscriptionId;

Copied: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransport.java (from r1303689, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransport.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransport.java?p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransport.java&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransport.java&r1=1303689&r2=1304984&rev=1304984&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransport.java Sun Mar 25 06:33:49 2012
@@ -14,28 +14,30 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.transport.stomp;
+package org.apache.activemq.transport.mqtt;
 
 import java.io.IOException;
 import java.security.cert.X509Certificate;
 
 import org.apache.activemq.command.Command;
+import org.fusesource.mqtt.codec.MQTTFrame;
 
 /**
  * Basic interface that mediates between protocol converter and transport
- *
  */
-public interface StompTransport {
+public interface MQTTTransport {
 
     public void sendToActiveMQ(Command command);
 
-    public void sendToStomp(StompFrame command) throws IOException;
+    public void sendToMQTT(MQTTFrame command) throws IOException;
 
     public X509Certificate[] getPeerCertificates();
 
     public void onException(IOException error);
 
-    public StompInactivityMonitor getInactivityMonitor();
+    public MQTTInactivityMonitor getInactivityMonitor();
 
-    public StompWireFormat getWireFormat();
+    public MQTTWireFormat getWireFormat();
+
+    public void stop() throws Exception;
 }

Copied: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java (from r1303689, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java?p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java&r1=1303689&r2=1304984&rev=1304984&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java Sun Mar 25 06:33:49 2012
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.transport.stomp;
+package org.apache.activemq.transport.mqtt;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -31,17 +31,17 @@ import org.apache.activemq.wireformat.Wi
 /**
  * A <a href="http://stomp.codehaus.org/">STOMP</a> transport factory
  */
-public class StompTransportFactory extends TcpTransportFactory implements BrokerServiceAware {
+public class MQTTTransportFactory extends TcpTransportFactory implements BrokerServiceAware {
 
     private BrokerContext brokerContext = null;
 
     protected String getDefaultWireFormatType() {
-        return "stomp";
+        return "mqtt";
     }
 
     @SuppressWarnings("rawtypes")
     public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
-        transport = new StompTransportFilter(transport, format, brokerContext);
+        transport = new MQTTTransportFilter(transport, format, brokerContext);
         IntrospectionSupport.setProperties(transport, options);
         return super.compositeConfigure(transport, format, options);
     }
@@ -65,9 +65,9 @@ public class StompTransportFactory exten
 
     @Override
     protected Transport createInactivityMonitor(Transport transport, WireFormat format) {
-        StompInactivityMonitor monitor = new StompInactivityMonitor(transport, format);
+        MQTTInactivityMonitor monitor = new MQTTInactivityMonitor(transport, format);
 
-        StompTransportFilter filter = (StompTransportFilter) transport.narrow(StompTransportFilter.class);
+        MQTTTransportFilter filter = transport.narrow(MQTTTransportFilter.class);
         filter.setInactivityMonitor(monitor);
 
         return monitor;

Copied: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java (from r1303689, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java?p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java&r1=1303689&r2=1304984&rev=1304984&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java Sun Mar 25 06:33:49 2012
@@ -14,13 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.transport.stomp;
+package org.apache.activemq.transport.mqtt;
 
 import java.io.IOException;
 import java.security.cert.X509Certificate;
 
 import javax.jms.JMSException;
-
 import org.apache.activemq.broker.BrokerContext;
 import org.apache.activemq.command.Command;
 import org.apache.activemq.transport.Transport;
@@ -29,6 +28,7 @@ import org.apache.activemq.transport.Tra
 import org.apache.activemq.transport.tcp.SslTransport;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.wireformat.WireFormat;
+import org.fusesource.mqtt.codec.MQTTFrame;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,25 +36,25 @@ import org.slf4j.LoggerFactory;
  * The StompTransportFilter normally sits on top of a TcpTransport that has been
  * configured with the StompWireFormat and is used to convert STOMP commands to
  * ActiveMQ commands. All of the conversion work is done by delegating to the
- * ProtocolConverter.
+ * MQTTProtocolConverter.
  *
  * @author <a href="http://hiramchirino.com">chirino</a>
  */
-public class StompTransportFilter extends TransportFilter implements StompTransport {
-    private static final Logger LOG = LoggerFactory.getLogger(StompTransportFilter.class);
-    private static final Logger TRACE = LoggerFactory.getLogger(StompTransportFilter.class.getPackage().getName() + ".StompIO");
-    private final ProtocolConverter protocolConverter;
-    private StompInactivityMonitor monitor;
-    private StompWireFormat wireFormat;
+public class MQTTTransportFilter extends TransportFilter implements MQTTTransport {
+    private static final Logger LOG = LoggerFactory.getLogger(MQTTTransportFilter.class);
+    private static final Logger TRACE = LoggerFactory.getLogger(MQTTTransportFilter.class.getPackage().getName() + ".MQTTIO");
+    private final MQTTProtocolConverter protocolConverter;
+    private MQTTInactivityMonitor monitor;
+    private MQTTWireFormat wireFormat;
 
     private boolean trace;
 
-    public StompTransportFilter(Transport next, WireFormat wireFormat, BrokerContext brokerContext) {
+    public MQTTTransportFilter(Transport next, WireFormat wireFormat, BrokerContext brokerContext) {
         super(next);
-        this.protocolConverter = new ProtocolConverter(this, brokerContext);
+        this.protocolConverter = new MQTTProtocolConverter(this, brokerContext);
 
-        if (wireFormat instanceof StompWireFormat) {
-            this.wireFormat = (StompWireFormat) wireFormat;
+        if (wireFormat instanceof MQTTWireFormat) {
+            this.wireFormat = (MQTTWireFormat) wireFormat;
         }
     }
 
@@ -73,7 +73,7 @@ public class StompTransportFilter extend
                 TRACE.trace("Received: \n" + command);
             }
 
-            protocolConverter.onStompCommand((StompFrame) command);
+            protocolConverter.onMQTTCommand((MQTTFrame) command);
         } catch (IOException e) {
             onException(e);
         } catch (JMSException e) {
@@ -88,7 +88,7 @@ public class StompTransportFilter extend
         }
     }
 
-    public void sendToStomp(StompFrame command) throws IOException {
+    public void sendToMQTT(MQTTFrame command) throws IOException {
         if (trace) {
             TRACE.trace("Sending: \n" + command);
         }
@@ -118,25 +118,18 @@ public class StompTransportFilter extend
     }
 
     @Override
-    public StompInactivityMonitor getInactivityMonitor() {
+    public MQTTInactivityMonitor getInactivityMonitor() {
         return monitor;
     }
 
-    public void setInactivityMonitor(StompInactivityMonitor monitor) {
+    public void setInactivityMonitor(MQTTInactivityMonitor monitor) {
         this.monitor = monitor;
     }
 
     @Override
-    public StompWireFormat getWireFormat() {
+    public MQTTWireFormat getWireFormat() {
         return this.wireFormat;
     }
 
-    public String getDefaultHeartBeat() {
-        return protocolConverter != null ? protocolConverter.getDefaultHeartBeat() : null;
-    }
-
-    public void setDefaultHeartBeat(String defaultHeartBeat) {
-        protocolConverter.setDefaultHeartBeat(defaultHeartBeat);
-    }
 
 }



Mime
View raw message