activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r1500057 - in /activemq/trunk/activemq-mqtt/src: main/java/org/apache/activemq/transport/mqtt/ test/java/org/apache/activemq/transport/mqtt/
Date Fri, 05 Jul 2013 16:21:29 GMT
Author: rajdavies
Date: Fri Jul  5 16:21:28 2013
New Revision: 1500057

URL: http://svn.apache.org/r1500057
Log:
added test for unsubscribe

Modified:
    activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
    activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java
    activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/FuseMQQTTClientProvider.java
    activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTClientProvider.java

Modified: activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java?rev=1500057&r1=1500056&r2=1500057&view=diff
==============================================================================
--- activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
(original)
+++ activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
Fri Jul  5 16:21:28 2013
@@ -277,6 +277,7 @@ public class MQTTProtocolConverter {
 
     void onMQTTDisconnect() throws MQTTProtocolException {
         if (connected.get()) {
+            connected.set(false);
             sendToActiveMQ(connectionInfo.createRemoveCommand(), null);
             sendToActiveMQ(new ShutdownInfo(), null);
         }
@@ -542,7 +543,7 @@ public class MQTTProtocolConverter {
 
     public void onTransportError() {
         if (connect != null) {
-            if (connect.willTopic() != null && connect.willMessage() != null) {
+            if (connected.get() && connect.willTopic() != null && connect.willMessage()
!= null) {
                 try {
                     PUBLISH publish = new PUBLISH();
                     publish.topicName(connect.willTopic());

Modified: activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java?rev=1500057&r1=1500056&r2=1500057&view=diff
==============================================================================
--- activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java
(original)
+++ activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java
Fri Jul  5 16:21:28 2013
@@ -22,7 +22,6 @@ import java.security.ProtectionDomain;
 import java.util.LinkedList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
@@ -77,27 +76,27 @@ public abstract class AbstractMQTTTest e
         super.tearDown();
     }
 
+
     @Test(timeout=300000)
-    public void testWillNotSentOnClose() throws Exception {
+    public void testSendAndReceiveMQTT() throws Exception {
         addMQTTConnector();
         brokerService.start();
         final MQTTClientProvider subscriptionProvider = getMQTTClientProvider();
         initializeConnection(subscriptionProvider);
 
-        String willTopic = "lastWillAndTestament";
 
-        subscriptionProvider.subscribe(willTopic,AT_MOST_ONCE);
+        subscriptionProvider.subscribe("foo/bah",AT_MOST_ONCE);
 
-        final AtomicInteger count = new AtomicInteger();
+        final CountDownLatch latch = new CountDownLatch(numberOfMessages);
 
         Thread thread = new Thread(new Runnable() {
             @Override
             public void run() {
-                for (int i = 0; i < 1; i++){
+                for (int i = 0; i < numberOfMessages; i++){
                     try {
                         byte[] payload = subscriptionProvider.receive(10000);
-                        assertNull("Should get a message", payload);
-                        count.incrementAndGet();
+                        assertNotNull("Should get a message", payload);
+                        latch.countDown();
                     } catch (Exception e) {
                         e.printStackTrace();
                         break;
@@ -109,29 +108,31 @@ public abstract class AbstractMQTTTest e
         thread.start();
 
         final MQTTClientProvider publishProvider = getMQTTClientProvider();
-        publishProvider.setWillTopic(willTopic);
-        publishProvider.setWillMessage("EverythingGoesToRob");
         initializeConnection(publishProvider);
 
-        Thread.sleep(1000);
-        publishProvider.disconnect();
+        for (int i = 0; i < numberOfMessages; i++){
+            String payload = "Message " + i;
+            publishProvider.publish("foo/bah",payload.getBytes(),AT_LEAST_ONCE);
+        }
 
-        assertEquals(0, count.get());
+        latch.await(10, TimeUnit.SECONDS);
+        assertEquals(0, latch.getCount());
         subscriptionProvider.disconnect();
         publishProvider.disconnect();
     }
 
     @Test(timeout=300000)
-    public void testSendAndReceiveMQTT() throws Exception {
+    public void testUnsubscribeMQTT() throws Exception {
         addMQTTConnector();
         brokerService.start();
         final MQTTClientProvider subscriptionProvider = getMQTTClientProvider();
         initializeConnection(subscriptionProvider);
 
+        String topic = "foo/bah";
 
-        subscriptionProvider.subscribe("foo/bah",AT_MOST_ONCE);
+        subscriptionProvider.subscribe(topic,AT_MOST_ONCE);
 
-        final CountDownLatch latch = new CountDownLatch(numberOfMessages);
+        final CountDownLatch latch = new CountDownLatch(numberOfMessages/2);
 
         Thread thread = new Thread(new Runnable() {
             @Override
@@ -156,7 +157,10 @@ public abstract class AbstractMQTTTest e
 
         for (int i = 0; i < numberOfMessages; i++){
             String payload = "Message " + i;
-            publishProvider.publish("foo/bah",payload.getBytes(),AT_LEAST_ONCE);
+            if (i == numberOfMessages/2){
+                subscriptionProvider.unsubscribe(topic);
+            }
+            publishProvider.publish(topic,payload.getBytes(),AT_LEAST_ONCE);
         }
 
         latch.await(10, TimeUnit.SECONDS);

Modified: activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/FuseMQQTTClientProvider.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/FuseMQQTTClientProvider.java?rev=1500057&r1=1500056&r2=1500057&view=diff
==============================================================================
--- activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/FuseMQQTTClientProvider.java
(original)
+++ activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/FuseMQQTTClientProvider.java
Fri Jul  5 16:21:28 2013
@@ -58,6 +58,11 @@ class FuseMQQTTClientProvider implements
     }
 
     @Override
+    public void unsubscribe(String topic) throws Exception {
+        connection.unsubscribe(new String[]{topic});
+    }
+
+    @Override
     public byte[] receive(int timeout) throws Exception {
         byte[] result = null;
         Message message = connection.receive(timeout, TimeUnit.MILLISECONDS);
@@ -82,4 +87,19 @@ class FuseMQQTTClientProvider implements
     public void setWillTopic(String topic) {
         mqtt.setWillTopic(topic);
     }
+
+    @Override
+    public void setClientId(String clientId) {
+        mqtt.setClientId(clientId);
+    }
+
+    @Override
+    public void kill() throws Exception {
+        connection.kill();
+    }
+
+    @Override
+    public void setKeepAlive(int keepAlive) throws Exception {
+        mqtt.setKeepAlive((short) keepAlive);
+    }
 }

Modified: activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTClientProvider.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTClientProvider.java?rev=1500057&r1=1500056&r2=1500057&view=diff
==============================================================================
--- activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTClientProvider.java
(original)
+++ activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTClientProvider.java
Fri Jul  5 16:21:28 2013
@@ -21,9 +21,13 @@ public interface  MQTTClientProvider {
     void disconnect() throws Exception;
     void publish(String topic,byte[] payload,int qos) throws Exception;
     void subscribe(String topic,int qos) throws Exception;
+    void unsubscribe(String topic) throws Exception;
     byte[] receive(int timeout) throws Exception;
     void setSslContext(javax.net.ssl.SSLContext sslContext);
     void setWillMessage(String string);
     void setWillTopic(String topic);
+    void setClientId(String clientId);
+    void kill() throws Exception;
+    void setKeepAlive(int keepAlive) throws Exception;
 
 }



Mime
View raw message