activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1033607 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java
Date Wed, 10 Nov 2010 17:44:07 GMT
Author: gtully
Date: Wed Nov 10 17:44:06 2010
New Revision: 1033607

URL: http://svn.apache.org/viewvc?rev=1033607&view=rev
Log:
resolution to duplicate issue from https://issues.apache.org/activemq/browse/AMQ-2980 - contention
over active flag caused premature dispatch on reactivation such that there could be duplicates,
fixing the contention sorts this, final piece of the puzzle, test now works as expected

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=1033607&r1=1033606&r2=1033607&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
Wed Nov 10 17:44:06 2010
@@ -19,6 +19,7 @@ package org.apache.activemq.broker.regio
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
@@ -48,7 +49,7 @@ public class DurableTopicSubscription ex
     private final ConcurrentHashMap<ActiveMQDestination, Destination> destinations
= new ConcurrentHashMap<ActiveMQDestination, Destination>();
     private final SubscriptionKey subscriptionKey;
     private final boolean keepDurableSubsActive;
-    private boolean active;
+    private AtomicBoolean active = new AtomicBoolean();
 
     public DurableTopicSubscription(Broker broker, SystemUsage usageManager, ConnectionContext
context, ConsumerInfo info, boolean keepDurableSubsActive)
         throws JMSException {
@@ -62,11 +63,11 @@ public class DurableTopicSubscription ex
     }
 
     public boolean isActive() {
-        return active;
+        return active.get();
     }
 
     public boolean isFull() {
-        return !active || super.isFull();
+        return !active.get() || super.isFull();
     }
 
     public void gc() {
@@ -100,7 +101,7 @@ public class DurableTopicSubscription ex
                 throw jmsEx;
             }
         }
-        if (active || keepDurableSubsActive) {
+        if (active.get() || keepDurableSubsActive) {
             Topic topic = (Topic)destination;
             topic.activate(context, this);
             if (pending.isEmpty(topic)) {
@@ -112,8 +113,7 @@ public class DurableTopicSubscription ex
 
     public void activate(SystemUsage memoryManager, ConnectionContext context,
             ConsumerInfo info) throws Exception {
-        if (!active) {
-            this.active = true;
+        if (!active.get()) {
             this.context = context;
             this.info = info;
             LOG.debug("Activating " + this);
@@ -145,6 +145,7 @@ public class DurableTopicSubscription ex
                     }
                 }
             }
+            this.active.set(true);
             dispatchPending();
             this.usageManager.getMemoryUsage().addUsageListener(this);
         }
@@ -152,7 +153,7 @@ public class DurableTopicSubscription ex
 
     public void deactivate(boolean keepDurableSubsActive) throws Exception {
         LOG.debug("Deactivating " + this);
-        active = false;
+        active.set(false);
         this.usageManager.getMemoryUsage().removeUsageListener(this);
         synchronized (pending) {
             pending.stop();
@@ -211,7 +212,7 @@ public class DurableTopicSubscription ex
     }
 
     public void add(MessageReference node) throws Exception {
-        if (!active && !keepDurableSubsActive) {
+        if (!active.get() && !keepDurableSubsActive) {
             return;
         }
         super.add(node);
@@ -224,7 +225,7 @@ public class DurableTopicSubscription ex
     }
 
     public int getPendingQueueSize() {
-        if (active || keepDurableSubsActive) {
+        if (active.get() || keepDurableSubsActive) {
             return super.getPendingQueueSize();
         }
         // TODO: need to get from store
@@ -236,7 +237,7 @@ public class DurableTopicSubscription ex
     }
 
     protected boolean canDispatch(MessageReference node) {
-        return active;
+        return active.get();
     }
 
     protected void acknowledge(ConnectionContext context, MessageAck ack, MessageReference
node) throws IOException {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java?rev=1033607&r1=1033606&r2=1033607&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java
Wed Nov 10 17:44:06 2010
@@ -81,7 +81,7 @@ public class JDBCMessagePriorityTest ext
         sub = sess.createDurableSubscriber(topic, subName);
         for (int i = 0; i < MSG_NUM * 4; i++) {
             Message msg = sub.receive(10000);
-            LOG.info("received i=" + i + ", m=" + (msg!=null?
+            LOG.debug("received i=" + i + ", m=" + (msg!=null?
                     msg.getJMSMessageID() + ", priority: " + msg.getJMSPriority()
                     : null) );
             assertNotNull("Message " + i + " was null", msg);
@@ -129,10 +129,10 @@ public class JDBCMessagePriorityTest ext
         sub = consumerSession.createDurableSubscriber(topic, subName);
         for (int i=0; i < MSG_NUM * maxPriority; i++) {
             Message msg = sub.receive(10000);
-            assertNull("no duplicate message", dups.put(msg.getJMSMessageID(), subName));
-            LOG.info("received i=" + i + ", m=" + (msg!=null?
+            LOG.debug("received i=" + i + ", m=" + (msg!=null?
                     msg.getJMSMessageID() + ", priority: " + msg.getJMSPriority()
                     : null) );
+            assertNull("no duplicate message failed on : " + msg.getJMSMessageID(), dups.put(msg.getJMSMessageID(),
subName));            
             assertNotNull("Message " + i + " was null", msg);
             messageCounts[msg.getJMSPriority()].incrementAndGet();
             if (i > 0 && i % closeFrequency == 0) {



Mime
View raw message