activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r1341521 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
Date Tue, 22 May 2012 15:22:56 GMT
Author: rajdavies
Date: Tue May 22 15:22:55 2012
New Revision: 1341521

URL: http://svn.apache.org/viewvc?rev=1341521&view=rev
Log:
Fix for https://issues.apache.org/jira/browse/AMQ-3855

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java?rev=1341521&r1=1341520&r2=1341521&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
Tue May 22 15:22:55 2012
@@ -261,8 +261,6 @@ class MQTTProtocolConverter {
 
     QoS onSubscribe(SUBSCRIBE command, Topic topic) throws MQTTProtocolException {
         ActiveMQDestination destination = new ActiveMQTopic(convertMQTTToActiveMQ(topic.name().toString()));
-
-
         if (destination == null) {
             throw new MQTTProtocolException("Invalid Destination.");
         }
@@ -458,31 +456,15 @@ class MQTTProtocolConverter {
         }
         result.topicName(topicName);
 
-        ByteSequence byteSequence = message.getContent();
-        if (message.isCompressed()) {
-            Inflater inflater = new Inflater();
-            inflater.setInput(byteSequence.data, byteSequence.offset, byteSequence.length);
-            byte[] data = new byte[4096];
-            int read;
-            ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
-            while ((read = inflater.inflate(data, 0, data.length)) != 0) {
-                bytesOut.write(data, 0, read);
-            }
-            byteSequence = bytesOut.toByteSequence();
-        }
 
         if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) {
-            if (byteSequence.getLength() > 4) {
-                byte[] content = new byte[byteSequence.getLength() - 4];
-                System.arraycopy(byteSequence.data, 4, content, 0, content.length);
-                result.payload(new Buffer(content));
-            } else {
                 ActiveMQTextMessage msg = (ActiveMQTextMessage) message.copy();
+                msg.setReadOnlyBody(true);
                 String messageText = msg.getText();
                 if (messageText != null) {
-                    result.payload(new Buffer(msg.getText().getBytes("UTF-8")));
+                    result.payload(new Buffer(messageText.getBytes("UTF-8")));
                 }
-            }
+
 
         } else if (message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE)
{
 
@@ -491,8 +473,29 @@ class MQTTProtocolConverter {
             byte[] data = new byte[(int) msg.getBodyLength()];
             msg.readBytes(data);
             result.payload(new Buffer(data));
-        } else {
+        } else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE){
+            ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy();
+            msg.setReadOnlyBody(true);
+            Map map = msg.getContentMap();
+            if (map != null){
+                result.payload(new Buffer(map.toString().getBytes("UTF-8")));
+            }
+        }
+
+        else {
+            ByteSequence byteSequence = message.getContent();
             if (byteSequence != null && byteSequence.getLength() > 0) {
+                if (message.isCompressed()){
+                    Inflater inflater = new Inflater();
+                    inflater.setInput(byteSequence.data,byteSequence.offset,byteSequence.length);
+                    byte[]  data = new byte[4096];
+                    int read;
+                    ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+                    while((read = inflater.inflate(data)) != 0){
+                       bytesOut.write(data,0,read);
+                    }
+                    byteSequence = bytesOut.toByteSequence();
+                }
                 result.payload(new Buffer(byteSequence.data, byteSequence.offset, byteSequence.length));
             }
         }
@@ -620,9 +623,9 @@ class MQTTProtocolConverter {
     }
 
     private String convertMQTTToActiveMQ(String name) {
-        String result = name.replace('>', '#');
-        result = result.replace('*', '+');
-        result = result.replace('.', '/');
+        String result = name.replace('#', '>');
+        result = result.replace('+', '*');
+        result = result.replace('/', '.');
         return result;
     }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java?rev=1341521&r1=1341520&r2=1341521&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
Tue May 22 15:22:55 2012
@@ -17,9 +17,13 @@
 package org.apache.activemq.transport.mqtt;
 
 import java.util.Vector;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
 import javax.jms.Session;
+import javax.jms.TextMessage;
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
@@ -63,6 +67,47 @@ public class MQTTTest {
     }
 
     @Test
+    public void testSendAndReceiveMQTT() throws Exception {
+        addMQTTConnector(brokerService);
+        brokerService.start();
+        MQTT mqtt = new MQTT();
+        final BlockingConnection subscribeConnection = mqtt.blockingConnection();
+        subscribeConnection.connect();
+        Topic topic = new Topic("foo/bah",QoS.AT_MOST_ONCE);
+        Topic[] topics = {topic};
+        subscribeConnection.subscribe(topics);
+        final CountDownLatch latch = new CountDownLatch(numberOfMessages);
+
+        Thread thread = new Thread(new Runnable() {
+            public void run() {
+                for (int i = 0; i < numberOfMessages; i++){
+                    try {
+                        Message message = subscribeConnection.receive();
+                        message.ack();
+                        latch.countDown();
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                        break;
+                    }
+
+                }
+            }
+        });
+        thread.start();
+
+        BlockingConnection publisherConnection = mqtt.blockingConnection();
+        publisherConnection.connect();
+        for (int i = 0; i < numberOfMessages; i++){
+            String payload = "Message " + i;
+            publisherConnection.publish(topic.name().toString(),payload.getBytes(),QoS.AT_LEAST_ONCE,false);
+        }
+
+        latch.await(10, TimeUnit.SECONDS);
+        assertEquals(0, latch.getCount());
+
+    }
+
+    @Test
     public void testSendAndReceiveAtMostOnce() throws Exception {
         addMQTTConnector(brokerService);
         brokerService.start();
@@ -172,7 +217,7 @@ public class MQTTTest {
         brokerService.start();
         MQTT mqtt = createMQTTConnection();
         BlockingConnection connection = mqtt.blockingConnection();
-        final String DESTINATION_NAME = "foo";
+        final String DESTINATION_NAME = "foo.*";
         connection.connect();
 
         ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory().createConnection();
@@ -183,7 +228,7 @@ public class MQTTTest {
 
         for (int i = 0; i < numberOfMessages; i++) {
             String payload = "Test Message: " + i;
-            connection.publish("foo", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
+            connection.publish("foo/bah", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
             ActiveMQMessage message = (ActiveMQMessage) consumer.receive();
             ByteSequence bs = message.getContent();
             assertEquals(payload, new String(bs.data, bs.offset, bs.length));
@@ -194,6 +239,36 @@ public class MQTTTest {
         connection.disconnect();
     }
 
+    @Test
+    public void testSendJMSReceiveMQTT() throws Exception {
+        addMQTTConnector(brokerService);
+        brokerService.addConnector(ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL);
+        brokerService.start();
+        MQTT mqtt = createMQTTConnection();
+        mqtt.setKeepAlive(Short.MAX_VALUE);
+        BlockingConnection connection = mqtt.blockingConnection();
+        connection.connect();
+
+        ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory().createConnection();
+        activeMQConnection.start();
+        Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        javax.jms.Topic jmsTopic = s.createTopic("foo.far");
+        MessageProducer producer = s.createProducer(jmsTopic);
+
+        Topic[] topics = {new Topic(utf8("foo/far"), QoS.AT_MOST_ONCE)};
+        connection.subscribe(topics);
+        for (int i = 0; i < numberOfMessages; i++) {
+            String payload = "This is Test Message: " + i;
+            TextMessage sendMessage = s.createTextMessage(payload);
+            producer.send(sendMessage);
+            Message message = connection.receive();
+            message.ack();
+            assertEquals(payload, new String(message.getPayload()));
+        }
+        connection.disconnect();
+    }
+
+
     protected void addMQTTConnector(BrokerService brokerService) throws Exception {
         brokerService.addConnector("mqtt://localhost:1883");
     }



Mime
View raw message