activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cshan...@apache.org
Subject activemq git commit: [AMQ-6587] additional contention window with new sub while gc is in progress - need to verify new sub is actually subscribed to the candidate for deletion. Fix and test
Date Tue, 07 Feb 2017 15:04:53 GMT
Repository: activemq
Updated Branches:
  refs/heads/activemq-5.14.x c5315d6d5 -> 2d4fb4cba


[AMQ-6587] additional contention window with new sub while gc is in progress - need to verify
new sub is actually subscribed to the candidate for deletion. Fix and test

(cherry picked from commit 0ee4f5b84302daf8901363556d038e274c2defd5)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/2d4fb4cb
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/2d4fb4cb
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/2d4fb4cb

Branch: refs/heads/activemq-5.14.x
Commit: 2d4fb4cbabb27f7fbb9ef5a86fa310d9741393c3
Parents: c5315d6
Author: gtully <gary.tully@gmail.com>
Authored: Tue Feb 7 13:53:40 2017 +0000
Committer: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Committed: Tue Feb 7 10:04:44 2017 -0500

----------------------------------------------------------------------
 .../activemq/broker/region/AbstractRegion.java  |  8 +-
 .../broker/region/DestinationGCStressTest.java  | 78 ++++++++++++++++++++
 2 files changed, 84 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/2d4fb4cb/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
index 6eb6e71..3f763e4 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
@@ -266,8 +266,12 @@ public abstract class AbstractRegion implements Region {
         if (timeout == 0) {
             for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();)
{
                 Subscription sub = iter.next();
-                if (sub.matches(destination)) {
-                    throw new JMSException("Destination still has an active subscription:
" + destination);
+                if (sub.matches(destination) ) {
+                    // may be a new sub created after gc decision, verify if really subscribed
+                    Destination toDelete  = destinations.get(destination);
+                    if (toDelete != null && toDelete.getDestinationStatistics().getConsumers().getCount()
> 0 ) {
+                        throw new JMSException("Destination still has an active subscription:
" + destination);
+                    }
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/2d4fb4cb/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/DestinationGCStressTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/DestinationGCStressTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/DestinationGCStressTest.java
index 80cd4be..8f7b123 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/DestinationGCStressTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/DestinationGCStressTest.java
@@ -148,4 +148,82 @@ public class DestinationGCStressTest {
         assertFalse("failed on unexpected log event", failed.get());
 
     }
+
+    @Test(timeout = 60000)
+    public void testAddRemoveWildcardWithGc() throws Exception {
+
+        org.apache.log4j.Logger log4jLogger =
+                org.apache.log4j.Logger.getLogger(RegionBroker.class);
+        final AtomicBoolean failed = new AtomicBoolean(false);
+
+        Appender appender = new DefaultTestAppender() {
+            @Override
+            public void doAppend(LoggingEvent event) {
+                if (event.getLevel().equals(Level.ERROR) && event.getMessage().toString().startsWith("Failed
to remove inactive")) {
+                    logger.info("received unexpected log message: " + event.getMessage());
+                    failed.set(true);
+                }
+            }
+        };
+        log4jLogger.addAppender(appender);
+        try {
+
+            final AtomicInteger max = new AtomicInteger(10000);
+
+            final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?create=false");
+            factory.setWatchTopicAdvisories(false);
+            Connection connection = factory.createConnection();
+            connection.start();
+            final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            ExecutorService executorService = Executors.newCachedThreadPool();
+            for (int i = 0; i < 1; i++) {
+                executorService.submit(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            Connection c = factory.createConnection();
+                            c.start();
+                            Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                            MessageProducer producer = s.createProducer(null);
+                            Message message = s.createTextMessage();
+                            int j;
+                            while ((j = max.decrementAndGet()) > 0) {
+                                producer.send(new ActiveMQTopic("A." + j), message);
+                            }
+                        } catch (Exception ignored) {
+                            ignored.printStackTrace();
+                        }
+                    }
+                });
+            }
+
+            executorService.submit(new Runnable() {
+                @Override
+                public void run() {
+                    for (int i = 0; i < 100; i++) {
+                        try {
+                            MessageConsumer messageConsumer = session.createConsumer(new
ActiveMQTopic(">"));
+                            messageConsumer.close();
+
+                        } catch (Exception ignored) {
+                            ignored.printStackTrace();
+                        }
+                    }
+                }
+            });
+
+            executorService.shutdown();
+            executorService.awaitTermination(60, TimeUnit.SECONDS);
+
+            logger.info("Done");
+
+            connection.close();
+
+        } finally {
+            log4jLogger.removeAppender(appender);
+        }
+        assertFalse("failed on unexpected log event", failed.get());
+
+    }
 }


Mime
View raw message