activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1492447 - in /activemq/trunk/activemq-mqtt/src: main/java/org/apache/activemq/transport/mqtt/ test/java/org/apache/activemq/transport/mqtt/
Date Wed, 12 Jun 2013 22:10:35 GMT
Author: tabish
Date: Wed Jun 12 22:10:34 2013
New Revision: 1492447

URL: http://svn.apache.org/r1492447
Log:
fix and test for: https://issues.apache.org/jira/browse/AMQ-4576

Modified:
    activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
    activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java
    activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java

Modified: activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java?rev=1492447&r1=1492446&r2=1492447&view=diff
==============================================================================
--- activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
(original)
+++ activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
Wed Jun 12 22:10:34 2013
@@ -314,7 +314,8 @@ class MQTTProtocolConverter {
         consumerInfo.setDispatchAsync(true);
         if (!connect.cleanSession() && (connect.clientId() != null)) {
             //by default subscribers are persistent
-            consumerInfo.setSubscriptionName(connect.clientId().toString());
+            consumerInfo.setSubscriptionName(
+                connect.clientId().toString() + topic.name().toString());
         }
         MQTTSubscription mqttSubscription = new MQTTSubscription(this, topic.qos(), consumerInfo);
 

Modified: activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java?rev=1492447&r1=1492446&r2=1492447&view=diff
==============================================================================
--- activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java
(original)
+++ activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java
Wed Jun 12 22:10:34 2013
@@ -17,8 +17,6 @@
 package org.apache.activemq.transport.mqtt;
 
 import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
 
 import java.io.File;
 import java.io.IOException;
@@ -59,6 +57,7 @@ public abstract class AbstractMQTTTest e
     protected LinkedList<Throwable> exceptions = new LinkedList<Throwable>();
     protected int numberOfMessages;
 
+    @Override
     @Before
     public void setUp() throws Exception {
         super.setUp();
@@ -70,6 +69,7 @@ public abstract class AbstractMQTTTest e
         this.numberOfMessages = 1000;
     }
 
+    @Override
     @After
     public void tearDown() throws Exception {
         if (brokerService != null) {

Modified: activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java?rev=1492447&r1=1492446&r2=1492447&view=diff
==============================================================================
--- activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
(original)
+++ activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
Wed Jun 12 22:10:34 2013
@@ -19,6 +19,9 @@ package org.apache.activemq.transport.mq
 import org.apache.activemq.util.Wait;
 import org.fusesource.mqtt.client.BlockingConnection;
 import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.Message;
+import org.fusesource.mqtt.client.QoS;
+import org.fusesource.mqtt.client.Topic;
 import org.fusesource.mqtt.client.Tracer;
 import org.fusesource.mqtt.codec.MQTTFrame;
 import org.junit.Test;
@@ -69,6 +72,43 @@ public class MQTTTest extends AbstractMQ
         connection.disconnect();
     }
 
+    public void testSubscribeMultipleTopics() throws Exception {
+        byte[] payload = new byte[1024 * 32];
+        for (int i = 0; i < payload.length; i++){
+            payload[i] = '2';
+        }
+
+        addMQTTConnector();
+        brokerService.start();
+        MQTT mqtt = createMQTTConnection();
+        mqtt.setClientId("MQTT-Client");
+        mqtt.setCleanSession(false);
+
+        final BlockingConnection connection = mqtt.blockingConnection();
+        connection.connect();
+
+        Topic[] topics = {new Topic("TopicA", QoS.EXACTLY_ONCE), new Topic("TopicB", QoS.EXACTLY_ONCE)};
+        connection.subscribe(topics);
+
+        for (Topic topic : topics) {
+            connection.publish(topic.name().toString(), payload, QoS.AT_LEAST_ONCE, false);
+        }
+
+        int received = 0;
+        for (int i = 0; i < topics.length; ++i) {
+            Message message = connection.receive();
+            assertNotNull(message);
+            received++;
+            payload = message.getPayload();
+            String messageContent = new String(payload);
+            LOG.info("Received message from topic: " + message.getTopic() +
+                     " Message content: " + messageContent);
+            message.ack();
+        }
+
+        assertEquals("Should have received " + topics.length + " messages", topics.length,
received);
+    }
+
     @Test(timeout=30000)
     public void testDefaultKeepAliveWhenClientSpecifiesZero() throws Exception {
         // default keep alive in milliseconds



Mime
View raw message