activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cshan...@apache.org
Subject activemq git commit: [AMQ-6587] - fix root cause of deletion candidate seeing sub. contention between adding sub to dests and adding dests to sub, also removing sub from dests twice could leave dest stat negative allowing invalid candidate for gc
Date Fri, 10 Feb 2017 13:06:16 GMT
Repository: activemq
Updated Branches:
  refs/heads/activemq-5.14.x 3e66eccb5 -> 727000f11


[AMQ-6587] - fix root cause of deletion candidate seeing sub. contention between adding sub
to dests and adding dests to sub, also removing sub from dests twice could leave dest stat
negative allowing invalid candidate for gc

(cherry picked from commit d86c98a68772aaa879e6a573fed1ab8382568c9e)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/727000f1
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/727000f1
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/727000f1

Branch: refs/heads/activemq-5.14.x
Commit: 727000f112b81aa3ca34087ee44a641b1d5088f6
Parents: 3e66ecc
Author: gtully <gary.tully@gmail.com>
Authored: Fri Feb 10 11:37:44 2017 +0000
Committer: Christopher L. Shannon <christopher.l.shannon@gmail.com>
Committed: Fri Feb 10 08:05:35 2017 -0500

----------------------------------------------------------------------
 .../activemq/broker/region/AbstractRegion.java  | 12 ++-
 .../activemq/broker/region/BaseDestination.java |  7 +-
 .../apache/activemq/broker/region/Topic.java    |  7 +-
 .../broker/region/DestinationGCStressTest.java  | 77 +++++++++++++++++++-
 4 files changed, 88 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/727000f1/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
index 3f763e4..50cd324 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
@@ -267,11 +267,7 @@ public abstract class AbstractRegion implements Region {
             for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();)
{
                 Subscription sub = iter.next();
                 if (sub.matches(destination) ) {
-                    // may be a new sub created after gc decision, verify if really subscribed
-                    Destination toDelete  = destinations.get(destination);
-                    if (toDelete != null && toDelete.getDestinationStatistics().getConsumers().getCount()
> 0 ) {
-                        throw new JMSException("Destination still has an active subscription:
" + destination);
-                    }
+                    throw new JMSException("Destination still has an active subscription:
" + destination);
                 }
             }
         }
@@ -394,6 +390,8 @@ public abstract class AbstractRegion implements Region {
                 for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination()))
{
                     addList.add(dest);
                 }
+                // ensure sub visible to any new dest addSubscriptionsForDestination
+                subscriptions.put(info.getConsumerId(), sub);
             } finally {
                 destinationsLock.readLock().unlock();
             }
@@ -416,6 +414,8 @@ public abstract class AbstractRegion implements Region {
                                 LOG.error("Error unsubscribing " + sub + " from " + remove
+ ": " + ex.getMessage(), ex);
                             }
                         }
+                        subscriptions.remove(info.getConsumerId());
+                        removeList.clear();
                         throw e;
                     }
                 }
@@ -426,8 +426,6 @@ public abstract class AbstractRegion implements Region {
                 ((QueueBrowserSubscription) sub).destinationsAdded();
             }
 
-            subscriptions.put(info.getConsumerId(), sub);
-
             return sub;
         }
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/727000f1/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
index 8420839..419e7d0 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
@@ -263,6 +263,7 @@ public abstract class BaseDestination implements Destination {
     @Override
     public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId)
throws Exception{
         destinationStatistics.getConsumers().decrement();
+        this.lastActiveTime=0l;
     }
 
 
@@ -298,9 +299,9 @@ public abstract class BaseDestination implements Destination {
 
     @Override
     public boolean isActive() {
-        boolean isActive = destinationStatistics.getConsumers().getCount() != 0 ||
-                           destinationStatistics.getProducers().getCount() != 0;
-        if (isActive && isGcWithNetworkConsumers() && destinationStatistics.getConsumers().getCount()
!= 0) {
+        boolean isActive = destinationStatistics.getConsumers().getCount() > 0 ||
+                           destinationStatistics.getProducers().getCount() > 0;
+        if (isActive && isGcWithNetworkConsumers() && destinationStatistics.getConsumers().getCount()
> 0) {
             isActive = hasRegularConsumers(getConsumers());
         }
         return isActive;

http://git-wip-us.apache.org/repos/asf/activemq/blob/727000f1/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
index 7707bf5..c553e8c 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
@@ -192,9 +192,12 @@ public class Topic extends BaseDestination implements Task {
     @Override
     public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId)
throws Exception {
         if (!sub.getConsumerInfo().isDurable()) {
-            super.removeSubscription(context, sub, lastDeliveredSequenceId);
+            boolean removed = false;
             synchronized (consumers) {
-                consumers.remove(sub);
+                removed = consumers.remove(sub);
+            }
+            if (removed) {
+                super.removeSubscription(context, sub, lastDeliveredSequenceId);
             }
         }
         sub.remove(context, this);

http://git-wip-us.apache.org/repos/asf/activemq/blob/727000f1/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/DestinationGCStressTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/DestinationGCStressTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/DestinationGCStressTest.java
index 8f7b123..c6f8409 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/DestinationGCStressTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/DestinationGCStressTest.java
@@ -42,6 +42,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 
 public class DestinationGCStressTest {
@@ -168,7 +169,7 @@ public class DestinationGCStressTest {
         log4jLogger.addAppender(appender);
         try {
 
-            final AtomicInteger max = new AtomicInteger(10000);
+            final AtomicInteger max = new AtomicInteger(20000);
 
             final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?create=false");
             factory.setWatchTopicAdvisories(false);
@@ -201,13 +202,12 @@ public class DestinationGCStressTest {
             executorService.submit(new Runnable() {
                 @Override
                 public void run() {
-                    for (int i = 0; i < 100; i++) {
+                    for (int i = 0; i < 1000; i++) {
                         try {
                             MessageConsumer messageConsumer = session.createConsumer(new
ActiveMQTopic(">"));
                             messageConsumer.close();
 
                         } catch (Exception ignored) {
-                            ignored.printStackTrace();
                         }
                     }
                 }
@@ -226,4 +226,75 @@ public class DestinationGCStressTest {
         assertFalse("failed on unexpected log event", failed.get());
 
     }
+
+    @Test(timeout = 60000)
+    public void testAllDestsSeeSub() throws Exception {
+
+        final AtomicInteger foundDestWithMissingSub = new AtomicInteger(0);
+
+        final AtomicInteger max = new AtomicInteger(20000);
+
+        final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?create=false");
+        factory.setWatchTopicAdvisories(false);
+        Connection connection = factory.createConnection();
+        connection.start();
+        final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        ExecutorService executorService = Executors.newCachedThreadPool();
+        for (int i = 0; i < 1; i++) {
+            executorService.submit(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        Connection c = factory.createConnection();
+                        c.start();
+                        Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                        MessageProducer producer = s.createProducer(null);
+                        Message message = s.createTextMessage();
+                        int j;
+                        while ((j = max.decrementAndGet()) > 0) {
+                            producer.send(new ActiveMQTopic("A." + j), message);
+                        }
+                    } catch (Exception ignored) {
+                        ignored.printStackTrace();
+                    }
+                }
+            });
+        }
+
+        executorService.submit(new Runnable() {
+            @Override
+            public void run() {
+                for (int i = 0; i < 1000; i++) {
+                    try {
+                        MessageConsumer messageConsumer = session.createConsumer(new ActiveMQTopic(">"));
+                        if (destMissingSub(foundDestWithMissingSub)) {
+                            break;
+                        }
+                        messageConsumer.close();
+
+                    } catch (Exception ignored) {
+                    }
+                }
+            }
+        });
+
+        executorService.shutdown();
+        executorService.awaitTermination(60, TimeUnit.SECONDS);
+        connection.close();
+
+        assertEquals("no dests missing sub", 0, foundDestWithMissingSub.get());
+
+    }
+
+    private boolean destMissingSub(AtomicInteger tally) {
+        for (Destination destination :
+                ((RegionBroker)brokerService.getRegionBroker()).getTopicRegion().getDestinationMap().values())
{
+            if (destination.getConsumers().isEmpty()) {
+                tally.incrementAndGet();
+                return true;
+            }
+        }
+        return false;
+    }
 }


Mime
View raw message