activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r1341820 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/Topic.java test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
Date Wed, 23 May 2012 10:25:35 GMT
Author: rajdavies
Date: Wed May 23 10:25:34 2012
New Revision: 1341820

URL: http://svn.apache.org/viewvc?rev=1341820&view=rev
Log:
Additional fixes for https://issues.apache.org/jira/browse/AMQ-3855 - timing issue in adding
wildcard subscriptions can result in duplicate messages sent to MQTT

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=1341820&r1=1341819&r2=1341820&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Wed May 23 10:25:34 2012
@@ -109,9 +109,6 @@ public class Topic extends BaseDestinati
     }
 
     public void addSubscription(ConnectionContext context, final Subscription sub) throws
Exception {
-
-       super.addSubscription(context, sub);
-
         if (!sub.getConsumerInfo().isDurable()) {
 
             // Do a retroactive recovery if needed.
@@ -121,23 +118,34 @@ public class Topic extends BaseDestinati
                 // while we are recovering a subscription to avoid out of order messages.
                 dispatchLock.writeLock().lock();
                 try {
+                    boolean applyRecovery = false;
                     synchronized (consumers) {
-                        sub.add(context, this);
-                        consumers.add(sub);
+                        if (!consumers.contains(sub)){
+                            sub.add(context, this);
+                            consumers.add(sub);
+                            applyRecovery=true;
+                            super.addSubscription(context, sub);
+                        }
+                    }
+                    if (applyRecovery){
+                        subscriptionRecoveryPolicy.recover(context, this, sub);
                     }
-                    subscriptionRecoveryPolicy.recover(context, this, sub);
                 } finally {
                     dispatchLock.writeLock().unlock();
                 }
 
             } else {
                 synchronized (consumers) {
-                    sub.add(context, this);
-                    consumers.add(sub);
+                    if (!consumers.contains(sub)){
+                        sub.add(context, this);
+                        consumers.add(sub);
+                        super.addSubscription(context, sub);
+                    }
                 }
             }
         } else {
             DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
+            super.addSubscription(context, sub);
     		sub.add(context, this);
     		if(dsub.isActive()) {
 	        	synchronized (consumers) {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java?rev=1341820&r1=1341819&r2=1341820&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
Wed May 23 10:25:34 2012
@@ -255,7 +255,7 @@ public class MQTTTest {
         javax.jms.Topic jmsTopic = s.createTopic("foo.far");
         MessageProducer producer = s.createProducer(jmsTopic);
 
-        Topic[] topics = {new Topic(utf8("foo/far"), QoS.AT_MOST_ONCE)};
+        Topic[] topics = {new Topic(utf8("foo/+"), QoS.AT_MOST_ONCE)};
         connection.subscribe(topics);
         for (int i = 0; i < numberOfMessages; i++) {
             String payload = "This is Test Message: " + i;



Mime
View raw message