activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhira...@apache.org
Subject git commit: https://issues.apache.org/jira/browse/AMQ-5292 Added support for publishDollarTopics transport option for MQTT to support Topics with '$' prefix
Date Mon, 28 Jul 2014 19:38:53 GMT
Repository: activemq
Updated Branches:
  refs/heads/trunk 207d4cdee -> 73908d649


https://issues.apache.org/jira/browse/AMQ-5292 Added support for publishDollarTopics transport
option for MQTT to support Topics with '$' prefix


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/73908d64
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/73908d64
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/73908d64

Branch: refs/heads/trunk
Commit: 73908d6498504dec2e23fcbbc8f27ab56f6c4a79
Parents: 207d4cd
Author: Dhiraj Bokde <dhirajsb@yahoo.com>
Authored: Mon Jul 28 12:38:43 2014 -0700
Committer: Dhiraj Bokde <dhirajsb@yahoo.com>
Committed: Mon Jul 28 12:38:43 2014 -0700

----------------------------------------------------------------------
 .../transport/mqtt/MQTTProtocolConverter.java   | 11 +++-
 .../transport/mqtt/MQTTTransportFilter.java     |  8 +++
 .../activemq/transport/mqtt/MQTTTest.java       | 57 ++++++++++++++++----
 3 files changed, 66 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/73908d64/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
index 769656f..3e64556 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
@@ -133,6 +133,7 @@ public class MQTTProtocolConverter {
     private int activeMQSubscriptionPrefetch=1;
     protected static final String QOS_PROPERTY_NAME = "ActiveMQ.MQTT.QoS";
     private final MQTTPacketIdGenerator packetIdGenerator;
+    private boolean publishDollarTopics;
 
     public MQTTProtocolConverter(MQTTTransport mqttTransport, BrokerService brokerService)
{
         this.mqttTransport = mqttTransport;
@@ -152,7 +153,7 @@ public class MQTTProtocolConverter {
         // Lets intercept message send requests..
         if( command instanceof ActiveMQMessage) {
             ActiveMQMessage msg = (ActiveMQMessage) command;
-            if( msg.getDestination().getPhysicalName().startsWith("$") ) {
+            if( !getPublishDollarTopics() && msg.getDestination().getPhysicalName().startsWith("$")
) {
                 // We don't allow users to send to $ prefixed topics to avoid failing MQTT
3.1.1 spec requirements
                 if( handler!=null ) {
                     try {
@@ -971,4 +972,12 @@ public class MQTTProtocolConverter {
     public MQTTPacketIdGenerator getPacketIdGenerator() {
         return packetIdGenerator;
     }
+
+    public void setPublishDollarTopics(boolean publishDollarTopics) {
+        this.publishDollarTopics = publishDollarTopics;
+    }
+
+    public boolean getPublishDollarTopics() {
+        return publishDollarTopics;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/73908d64/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
index 181d235..8612c25 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
@@ -189,6 +189,14 @@ public class MQTTTransportFilter extends TransportFilter implements MQTTTranspor
         protocolConverter.setDefaultKeepAlive(defaultHeartBeat);
     }
 
+    public boolean getPublishDollarTopics() {
+        return protocolConverter != null && protocolConverter.getPublishDollarTopics();
+    }
+
+    public void setPublishDollarTopics(boolean publishDollarTopics) {
+        protocolConverter.setPublishDollarTopics(publishDollarTopics);
+    }
+
     public int getActiveMQSubscriptionPrefetch() {
         return protocolConverter.getActiveMQSubscriptionPrefetch();
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/73908d64/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
index ec7f1cc..fc91f27 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
@@ -16,14 +16,6 @@
  */
 package org.apache.activemq.transport.mqtt;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
 import java.net.ProtocolException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -35,7 +27,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
-
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.Destination;
@@ -44,6 +35,13 @@ import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
@@ -1045,6 +1043,47 @@ public class MQTTTest extends MQTTTestSupport {
         connection.disconnect();
     }
 
+    @Test(timeout = 60 * 1000)
+    public void testPublishDollarTopics() throws Exception {
+        stopBroker();
+        startBroker();
+
+        MQTT mqtt = createMQTTConnection();
+        final String clientId = "publishDollar";
+        mqtt.setClientId(clientId);
+        mqtt.setKeepAlive((short) 2);
+        BlockingConnection connection = mqtt.blockingConnection();
+        connection.connect();
+
+        final String DOLLAR_TOPIC = "$TopicA";
+        connection.subscribe(new Topic[] { new Topic(DOLLAR_TOPIC, QoS.EXACTLY_ONCE)});
+        connection.publish(DOLLAR_TOPIC, DOLLAR_TOPIC.getBytes(), QoS.EXACTLY_ONCE, true);
+
+        Message message = connection.receive(10, TimeUnit.SECONDS);
+        assertNull("Publish enabled for $ Topics by default", message);
+        connection.disconnect();
+
+        stopBroker();
+        protocolConfig = "transport.publishDollarTopics=true";
+        startBroker();
+
+        mqtt = createMQTTConnection();
+        mqtt.setClientId(clientId);
+        mqtt.setKeepAlive((short) 2);
+        connection = mqtt.blockingConnection();
+        connection.connect();
+
+        connection.subscribe(new Topic[] { new Topic(DOLLAR_TOPIC, QoS.EXACTLY_ONCE)});
+        connection.publish(DOLLAR_TOPIC, DOLLAR_TOPIC.getBytes(), QoS.EXACTLY_ONCE, true);
+
+        message = connection.receive(10, TimeUnit.SECONDS);
+        assertNotNull(message);
+        message.ack();
+        assertEquals("Message body", DOLLAR_TOPIC, new String(message.getPayload()));
+
+        connection.disconnect();
+    }
+
     @Test(timeout = 30 * 10000)
     public void testJmsMapping() throws Exception {
         // start up jms consumer


Mime
View raw message