activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject git commit: Applying patch AMQ-5074: MQTT paths with empty levels are not handled correctly.
Date Mon, 24 Feb 2014 14:05:35 GMT
Repository: activemq
Updated Branches:
  refs/heads/trunk 2b3c47775 -> e7e317dc7


Applying patch AMQ-5074: MQTT paths with empty levels are not handled correctly.

Thanks Dhiraj!

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/e7e317dc
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/e7e317dc
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/e7e317dc

Branch: refs/heads/trunk
Commit: e7e317dc7ed00de26ba905c34debaf38184f497b
Parents: 2b3c477
Author: Hiram Chirino <hiram@hiramchirino.com>
Authored: Mon Feb 24 09:05:23 2014 -0500
Committer: Hiram Chirino <hiram@hiramchirino.com>
Committed: Mon Feb 24 09:05:23 2014 -0500

----------------------------------------------------------------------
 .../activemq/command/ActiveMQDestination.java   |  15 ++-
 .../filter/AnyChildDestinationNode.java         |   1 -
 .../activemq/filter/DestinationFilter.java      |  23 ----
 .../apache/activemq/filter/DestinationMap.java  |   1 -
 .../activemq/filter/DestinationMapNode.java     |  17 ++-
 .../filter/PrefixDestinationFilter.java         |  12 +-
 .../activemq/transport/mqtt/MQTTTest.java       | 113 ++++++++++++++++---
 .../activemq/filter/DestinationMapTest.java     |  22 +++-
 8 files changed, 148 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/e7e317dc/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQDestination.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQDestination.java
b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQDestination.java
index 3bd4f25..0cea4e5 100755
--- a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQDestination.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQDestination.java
@@ -283,14 +283,17 @@ public abstract class ActiveMQDestination extends JNDIBaseStorable implements
Da
         }
 
         List<String> l = new ArrayList<String>();
-        StringTokenizer iter = new StringTokenizer(physicalName, PATH_SEPERATOR);
-        while (iter.hasMoreTokens()) {
-            String name = iter.nextToken().trim();
-            if (name.length() == 0) {
-                continue;
+        StringBuilder level = new StringBuilder();
+        final char separator = PATH_SEPERATOR.charAt(0);
+        for (char c : physicalName.toCharArray()) {
+            if (c == separator) {
+                l.add(level.toString());
+                level.delete(0, level.length());
+            } else {
+                level.append(c);
             }
-            l.add(name);
         }
+        l.add(level.toString());
 
         destinationPaths = new String[l.size()];
         l.toArray(destinationPaths);

http://git-wip-us.apache.org/repos/asf/activemq/blob/e7e317dc/activemq-client/src/main/java/org/apache/activemq/filter/AnyChildDestinationNode.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/filter/AnyChildDestinationNode.java
b/activemq-client/src/main/java/org/apache/activemq/filter/AnyChildDestinationNode.java
index cf35ac8..985df79 100644
--- a/activemq-client/src/main/java/org/apache/activemq/filter/AnyChildDestinationNode.java
+++ b/activemq-client/src/main/java/org/apache/activemq/filter/AnyChildDestinationNode.java
@@ -100,7 +100,6 @@ public class AnyChildDestinationNode implements DestinationNode {
         return answer;
     }
 
-
     public Collection getChildren() {
         Collection answer = new ArrayList();
         Iterator iter = getChildNodes().iterator();

http://git-wip-us.apache.org/repos/asf/activemq/blob/e7e317dc/activemq-client/src/main/java/org/apache/activemq/filter/DestinationFilter.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/filter/DestinationFilter.java
b/activemq-client/src/main/java/org/apache/activemq/filter/DestinationFilter.java
index 34f4b8a..0424524 100755
--- a/activemq-client/src/main/java/org/apache/activemq/filter/DestinationFilter.java
+++ b/activemq-client/src/main/java/org/apache/activemq/filter/DestinationFilter.java
@@ -53,7 +53,6 @@ public abstract class DestinationFilter implements BooleanExpression {
             return new CompositeDestinationFilter(destination);
         }
         String[] paths = DestinationPath.getDestinationPaths(destination);
-        paths = rationalizePaths(paths);
         int idx = paths.length - 1;
         if (idx >= 0) {
             String lastPath = paths[idx];
@@ -73,26 +72,4 @@ public abstract class DestinationFilter implements BooleanExpression {
         return new SimpleDestinationFilter(destination);
     }
 
-    /**
-     * Look for the case where any CHILD is followed by any decsendant
-     */
-    public static String[] rationalizePaths(String[] paths) {
-        String[] result = paths;
-        if (paths != null && paths.length > 1) {
-            int last = paths.length - 1;
-            if (paths[last].equals(ANY_DESCENDENT)) {
-                last -= 1;
-                if (paths[last].equals(ANY_DESCENDENT) || paths[last].equals(ANY_CHILD))
{
-
-                    result = new String[paths.length-1];
-                    System.arraycopy(paths,0,result,0,result.length);
-                    result[result.length-1] = ANY_DESCENDENT;
-                    result = rationalizePaths(result);
-                }
-            }
-        }
-
-        return result;
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/e7e317dc/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMap.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMap.java
b/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMap.java
index 2c71578..48a4cd3 100755
--- a/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMap.java
+++ b/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMap.java
@@ -84,7 +84,6 @@ public class DestinationMap {
             return;
         }
         String[] paths = key.getDestinationPaths();
-        paths = DestinationFilter.rationalizePaths(paths);
         getRootNode(key).add(paths, 0, value);
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/e7e317dc/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMapNode.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMapNode.java
b/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMapNode.java
index a2360a0..bd82a93 100755
--- a/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMapNode.java
+++ b/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMapNode.java
@@ -92,7 +92,7 @@ public class DestinationMapNode implements DestinationNode {
     }
 
     /**
-     * Returns a mutable List of the values available at this node in the tree
+     * Removes values available at this node in the tree
      */
     @SuppressWarnings({ "rawtypes", "unchecked" })
     public List removeValues() {
@@ -112,7 +112,11 @@ public class DestinationMapNode implements DestinationNode {
 
     @SuppressWarnings({ "rawtypes", "unchecked" })
     protected void removeDesendentValues(Set answer) {
-        answer.addAll(removeValues());
+        for (Map.Entry<String, DestinationNode> child : childNodes.entrySet()) {
+            // remove all the values from the child
+            answer.addAll(child.getValue().removeValues());
+            answer.addAll(child.getValue().removeDesendentValues());
+        }
     }
 
     /**
@@ -162,6 +166,7 @@ public class DestinationMapNode implements DestinationNode {
                 break;
             }
 
+            // TODO is this correct, we are appending wildcard values here???
             node.appendMatchingWildcards(answer, paths, i);
             if (path.equals(ANY_CHILD)) {
                 // node = node.getAnyChildNode();
@@ -179,10 +184,9 @@ public class DestinationMapNode implements DestinationNode {
 
     @SuppressWarnings({ "rawtypes", "unchecked" })
     public void appendDescendantValues(Set answer) {
-        answer.addAll(values);
-
-        // lets add all the children too
+        // add children values, then recursively add their children
         for(DestinationNode child : childNodes.values()) {
+            answer.addAll(child.getValues());
             child.appendDescendantValues(answer);
         }
     }
@@ -208,6 +212,9 @@ public class DestinationMapNode implements DestinationNode {
         }
         wildCardNode = getChild(ANY_DESCENDENT);
         if (wildCardNode != null) {
+            // for a wildcard Node match, add all values of the descendant node
+            answer.addAll(wildCardNode.getValues());
+            // and all descendants for paths like ">.>"
             answer.addAll(wildCardNode.getDesendentValues());
         }
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/e7e317dc/activemq-client/src/main/java/org/apache/activemq/filter/PrefixDestinationFilter.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/filter/PrefixDestinationFilter.java
b/activemq-client/src/main/java/org/apache/activemq/filter/PrefixDestinationFilter.java
index 5f9b6bc..d83df46 100755
--- a/activemq-client/src/main/java/org/apache/activemq/filter/PrefixDestinationFilter.java
+++ b/activemq-client/src/main/java/org/apache/activemq/filter/PrefixDestinationFilter.java
@@ -36,7 +36,13 @@ public class PrefixDestinationFilter extends DestinationFilter {
      * @param prefixes
      */
     public PrefixDestinationFilter(String[] prefixes, byte destinationType) {
-        this.prefixes = prefixes;
+        // collapse duplicate '>' at the end of the path
+        int lastIndex = prefixes.length - 1;
+        while (lastIndex >= 0 && ANY_DESCENDENT.equals(prefixes[lastIndex])) {
+            lastIndex--;
+        }
+        this.prefixes = new String[lastIndex + 2];
+        System.arraycopy(prefixes, 0, this.prefixes, 0, this.prefixes.length);
         this.destinationType = destinationType;
     }
 
@@ -59,11 +65,11 @@ public class PrefixDestinationFilter extends DestinationFilter {
             //want to look for the case where A matches A.>
             boolean match = true;
             for (int i = 0; (i < path.length && match); i++){
-                   match &= matches(prefixes[i],path[i]);
+                   match = matches(prefixes[i], path[i]);
             }
             //paths get compacted - e.g. A.*.> will be compacted to A.> and by definition
- the last element on
             //the prefix will be >
-            if (match && prefixes.length == (path.length + 1)){
+            if (match && prefixes.length == (path.length + 1)) {
                 return true;
             }
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/e7e317dc/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
index 3e1e6cb..9ece80e 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
@@ -16,11 +16,11 @@
  */
 package org.apache.activemq.transport.mqtt;
 
-import java.net.ProtocolException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.Destination;
@@ -326,6 +326,99 @@ public class MQTTTest extends AbstractMQTTTest {
         publisher.disconnect();
     }
 
+    @Test(timeout=30000)
+    public void testValidZeroLengthClientId() throws Exception {
+        addMQTTConnector();
+        brokerService.start();
+
+        MQTT mqtt = createMQTTConnection();
+        mqtt.setClientId("");
+        mqtt.setCleanSession(true);
+
+        BlockingConnection connection = mqtt.blockingConnection();
+        connection.connect();
+        connection.disconnect();
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testMQTTPathPatterns() throws Exception {
+        addMQTTConnector();
+        brokerService.start();
+
+        MQTT mqtt = createMQTTConnection();
+        mqtt.setClientId("");
+        mqtt.setCleanSession(true);
+
+        BlockingConnection connection = mqtt.blockingConnection();
+        connection.connect();
+
+        final String RETAINED = "RETAINED";
+        String[] topics = {"TopicA", "/TopicA", "/", "TopicA/", "//"};
+        for (String topic : topics) {
+            // test retained message
+            connection.publish(topic, (RETAINED + topic).getBytes(), QoS.AT_LEAST_ONCE, true);
+
+            connection.subscribe(new Topic[]{new Topic(topic, QoS.AT_LEAST_ONCE)});
+            Message msg = connection.receive(1000, TimeUnit.MILLISECONDS);
+            assertNotNull(msg);
+            assertEquals(RETAINED + topic, new String(msg.getPayload()));
+            msg.ack();
+
+            // test non-retained message
+            connection.publish(topic, topic.getBytes(), QoS.AT_LEAST_ONCE, false);
+            msg = connection.receive(1000, TimeUnit.MILLISECONDS);
+            assertNotNull(msg);
+            assertEquals(topic, new String(msg.getPayload()));
+            msg.ack();
+
+            connection.unsubscribe(new String[] {topic});
+        }
+        connection.disconnect();
+
+        // test wildcard patterns with above topics
+        String[] wildcards = {"#", "+", "+/#", "/+", "+/", "+/+", "+/+/", "+/+/+"};
+        for (String wildcard : wildcards) {
+            final Pattern pattern = Pattern.compile(wildcard.replaceAll("/?#", "(/?.*)*").replaceAll("\\+",
"[^/]*"));
+
+            connection = mqtt.blockingConnection();
+            connection.connect();
+            connection.subscribe(new Topic[]{new Topic(wildcard, QoS.AT_LEAST_ONCE)});
+
+            // test retained messages
+            Message msg = connection.receive(1000, TimeUnit.MILLISECONDS);
+            do {
+                assertNotNull("RETAINED null " + wildcard, msg);
+                assertTrue("RETAINED prefix " + wildcard, new String(msg.getPayload()).startsWith(RETAINED));
+                assertTrue("RETAINED matching " + wildcard + " " + msg.getTopic(),
+                    pattern.matcher(msg.getTopic()).matches());
+                msg.ack();
+                msg = connection.receive(1000, TimeUnit.MILLISECONDS);
+            } while (msg != null);
+
+            // connection is borked after timeout in connection.receive()
+            connection.disconnect();
+            connection = mqtt.blockingConnection();
+            connection.connect();
+            connection.subscribe(new Topic[]{new Topic(wildcard, QoS.AT_LEAST_ONCE)});
+
+            // test non-retained message
+            for (String topic : topics) {
+                connection.publish(topic, topic.getBytes(), QoS.AT_LEAST_ONCE, false);
+            }
+            msg = connection.receive(1000, TimeUnit.MILLISECONDS);
+            do {
+                assertNotNull("Non-retained Null " + wildcard, msg);
+                assertTrue("Non-retained matching " + wildcard + " " + msg.getTopic(),
+                    pattern.matcher(msg.getTopic()).matches());
+                msg.ack();
+                msg = connection.receive(1000, TimeUnit.MILLISECONDS);
+            } while (msg != null);
+
+            connection.unsubscribe(new String[] { wildcard });
+            connection.disconnect();
+        }
+    }
+
     @Test(timeout = 60 * 1000)
     public void testMQTTRetainQoS() throws Exception {
         addMQTTConnector();
@@ -345,13 +438,7 @@ public class MQTTTest extends AbstractMQTTTest {
                 public void onReceive(MQTTFrame frame) {
                     // validate the QoS
                     if (frame.messageType() == PUBLISH.TYPE) {
-                        PUBLISH publish = new PUBLISH();
-                        try {
-                            publish.decode(frame);
-                        } catch (ProtocolException e) {
-                            fail ("Failed decoding " + e.getMessage());
-                        }
-                        actualQoS[0] = publish.qos().ordinal();
+                        actualQoS[0] = frame.qos().ordinal();
                     }
                 }
             });
@@ -370,6 +457,7 @@ public class MQTTTest extends AbstractMQTTTest {
                 waitCount++;
             }
             assertEquals(i, actualQoS[0]);
+            msg.ack();
 
             connection.unsubscribe(new String[]{topic});
             connection.disconnect();
@@ -392,13 +480,7 @@ public class MQTTTest extends AbstractMQTTTest {
             public void onReceive(MQTTFrame frame) {
                 // validate the QoS
                 if (frame.messageType() == PUBLISH.TYPE) {
-                    PUBLISH publish = new PUBLISH();
-                    try {
-                        publish.decode(frame);
-                    } catch (ProtocolException e) {
-                        fail("Failed decoding " + e.getMessage());
-                    }
-                    actualQoS[0] = publish.qos().ordinal();
+                    actualQoS[0] = frame.qos().ordinal();
                 }
             }
         });
@@ -416,6 +498,7 @@ public class MQTTTest extends AbstractMQTTTest {
             final Message msg = connection.receive(5000, TimeUnit.MILLISECONDS);
             assertNotNull(msg);
             assertEquals(RETAIN, new String(msg.getPayload()));
+            msg.ack();
             int waitCount = 0;
             while (actualQoS[0] == -1 && waitCount < 10) {
                 Thread.sleep(1000);

http://git-wip-us.apache.org/repos/asf/activemq/blob/e7e317dc/activemq-unit-tests/src/test/java/org/apache/activemq/filter/DestinationMapTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/filter/DestinationMapTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/filter/DestinationMapTest.java
index 5de46b7..2f0f92c 100755
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/filter/DestinationMapTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/filter/DestinationMapTest.java
@@ -23,11 +23,10 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 
-import junit.framework.TestCase;
-
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
+import junit.framework.TestCase;
 
 public class DestinationMapTest extends TestCase {
     protected DestinationMap map = new DestinationMap();
@@ -231,6 +230,25 @@ public class DestinationMapTest extends TestCase {
         assertMapValue("TEST.D1", null);
     }
 
+    public void testMQTTMappedWildcards() throws Exception {
+        put("TopicA", v1);
+        put(".TopicA", v2);
+        put("TopicA.", v3);
+        put(".", v4);
+        put("..TopicA", v5);
+        put("..", v6);
+
+        // test wildcard patterns "#", "+", "+/#", "/+", "+/", "+/+", "+/+/", "+/+/+"
+        assertMapValue(">", v1, v2, v3, v4, v5, v6);
+        assertMapValue("*", v1);
+        assertMapValue("*.>", v1, v2, v3, v4, v5, v6);
+        assertMapValue(".*", v2, v4);
+        assertMapValue("*.", v3, v4);
+        assertMapValue("*.*", v2, v3, v4);
+        assertMapValue("*.*.", v6);
+        assertMapValue("*.*.*", v5, v6);
+    }
+
     public void testStoreAndLookupAllWildcards() throws Exception {
         loadSample2();
 


Mime
View raw message