activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject [1/2] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5875
Date Fri, 17 Jul 2015 15:57:27 GMT
Repository: activemq
Updated Branches:
  refs/heads/master e0c2c177c -> a439a0c6b


https://issues.apache.org/jira/browse/AMQ-5875

Fixing a regression that caused a network bridge to recreate durable
demand improperly.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/2117768e
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/2117768e
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/2117768e

Branch: refs/heads/master
Commit: 2117768e0a6c7bab0225f5ba4e960bfb443188c7
Parents: e0c2c17
Author: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Authored: Fri Jul 10 16:43:26 2015 +0000
Committer: gtully <gary.tully@gmail.com>
Committed: Fri Jul 17 16:34:06 2015 +0100

----------------------------------------------------------------------
 .../activemq/network/DurableConduitBridge.java  |  23 ++-
 .../kahadb/AbstractMultiKahaDBDeletionTest.java | 202 +++++++++++++++++++
 .../kahadb/MultiKahaDBQueueDeletionTest.java    |  91 +++++++++
 .../kahadb/MultiKahaDBTopicDeletionTest.java    | 176 ++--------------
 4 files changed, 326 insertions(+), 166 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/2117768e/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
b/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
index 621972c..afcb42d 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
@@ -18,6 +18,7 @@ package org.apache.activemq.network;
 
 import java.io.IOException;
 
+import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
@@ -32,6 +33,7 @@ import org.slf4j.LoggerFactory;
 public class DurableConduitBridge extends ConduitBridge {
     private static final Logger LOG = LoggerFactory.getLogger(DurableConduitBridge.class);
 
+    @Override
     public String toString() {
         return "DurableConduitBridge:" + configuration.getBrokerName() + "->" + getRemoteBrokerName();
     }
@@ -52,6 +54,7 @@ public class DurableConduitBridge extends ConduitBridge {
      * Subscriptions for these destinations are always created
      *
      */
+    @Override
     protected void setupStaticDestinations() {
         super.setupStaticDestinations();
         ActiveMQDestination[] dests = configuration.isDynamicOnly() ? null : durableDestinations;
@@ -60,11 +63,22 @@ public class DurableConduitBridge extends ConduitBridge {
                 if (isPermissableDestination(dest) && !doesConsumerExist(dest)) {
                     DemandSubscription sub = createDemandSubscription(dest);
                     sub.setStaticallyIncluded(true);
-                    if (dest.isTopic()) {
-                        sub.getLocalInfo().setSubscriptionName(getSubscriberName(dest));
-                    }
                     try {
-                        addSubscription(sub);
+                        //Filtering by non-empty subscriptions, see AMQ-5875
+                        if (dest.isTopic()) {
+                            sub.getLocalInfo().setSubscriptionName(getSubscriberName(dest));
+                            for (Subscription subscription : this.getRegionSubscriptions(dest))
{
+                                String clientId = subscription.getContext().getClientId();
+                                String subName = subscription.getConsumerInfo().getSubscriptionName();
+                                if (clientId != null && clientId.equals(sub.getLocalInfo().getClientId())
+                                        && subName != null && subName.equals(sub.getLocalInfo().getSubscriptionName()))
{
+                                    addSubscription(sub);
+                                    break;
+                                }
+                            }
+                        } else {
+                            addSubscription(sub);
+                        }
                     } catch (IOException e) {
                         LOG.error("Failed to add static destination {}", dest, e);
                     }
@@ -74,6 +88,7 @@ public class DurableConduitBridge extends ConduitBridge {
         }
     }
 
+    @Override
     protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException
{
         if (addToAlreadyInterestedConsumers(info)) {
             return null; // don't want this subscription added

http://git-wip-us.apache.org/repos/asf/activemq/blob/2117768e/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/AbstractMultiKahaDBDeletionTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/AbstractMultiKahaDBDeletionTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/AbstractMultiKahaDBDeletionTest.java
new file mode 100644
index 0000000..e1ba0ff
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/AbstractMultiKahaDBDeletionTest.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public abstract class AbstractMultiKahaDBDeletionTest {
+    protected static final Logger LOG = LoggerFactory
+            .getLogger(MultiKahaDBTopicDeletionTest.class);
+
+    protected BrokerService brokerService;
+    protected Broker broker;
+    protected URI brokerConnectURI;
+    protected File storeDir;
+    protected ActiveMQDestination dest1;
+    protected ActiveMQDestination dest2;
+
+    public AbstractMultiKahaDBDeletionTest(ActiveMQDestination dest1, ActiveMQDestination
dest2) {
+        this.dest1 = dest1;
+        this.dest2 = dest2;
+    }
+
+    @Rule
+    public TemporaryFolder tempTestDir = new TemporaryFolder();
+
+    @Before
+    public void startBroker() throws Exception {
+        setUpBroker(true);
+    }
+
+    protected void setUpBroker(boolean clearDataDir) throws Exception {
+        brokerService = new BrokerService();
+        this.initPersistence(brokerService);
+        // set up a transport
+        TransportConnector connector = brokerService
+                .addConnector(new TransportConnector());
+        connector.setUri(new URI("tcp://0.0.0.0:0"));
+        connector.setName("tcp");
+
+        brokerService.start();
+        brokerService.waitUntilStarted();
+        brokerConnectURI = brokerService.getConnectorByName("tcp")
+                .getConnectUri();
+        broker = brokerService.getBroker();
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        brokerService.stop();
+        brokerService.waitUntilStopped();
+    }
+
+    protected void initPersistence(BrokerService brokerService)
+            throws IOException {
+        storeDir = tempTestDir.getRoot();
+        brokerService.setPersistent(true);
+
+        // setup multi-kaha adapter
+        MultiKahaDBPersistenceAdapter persistenceAdapter = new MultiKahaDBPersistenceAdapter();
+        persistenceAdapter.setDirectory(storeDir);
+
+        KahaDBPersistenceAdapter kahaStore = new KahaDBPersistenceAdapter();
+        kahaStore.setJournalMaxFileLength(1024 * 512);
+
+        // set up a store per destination
+        FilteredKahaDBPersistenceAdapter filtered = new FilteredKahaDBPersistenceAdapter();
+        filtered.setPersistenceAdapter(kahaStore);
+        filtered.setPerDestination(true);
+        List<FilteredKahaDBPersistenceAdapter> stores = new ArrayList<>();
+        stores.add(filtered);
+
+        persistenceAdapter.setFilteredPersistenceAdapters(stores);
+        brokerService.setPersistenceAdapter(persistenceAdapter);
+    }
+
+    /**
+     * Test that a destination can be deleted and the other destination can still be subscribed
to
+     * @throws Exception
+     */
+    @Test
+    public void testDest1Deletion() throws Exception {
+        LOG.info("Creating {} first, {} second", dest1, dest2);
+        LOG.info("Removing {}, subscribing to {}", dest1, dest2);
+
+        // Create two destinations
+        broker.addDestination(brokerService.getAdminConnectionContext(), dest1, false);
+        broker.addDestination(brokerService.getAdminConnectionContext(), dest2, false);
+
+        // remove destination2
+        broker.removeDestination(brokerService.getAdminConnectionContext(), dest1, 100);
+
+        // try and create a consumer on dest2, before AMQ-5875 this
+        //would cause an IllegalStateException for Topics
+        createConsumer(dest2);
+        Collection<File> storeFiles = FileUtils.listFiles(storeDir, new WildcardFileFilter("db*"),
getStoreFileFilter());
+        assertTrue("Store index should still exist", storeFiles.size() >= 1);
+    }
+
+
+    @Test
+    public void testDest2Deletion() throws Exception {
+        LOG.info("Creating {} first, {} second", dest1, dest2);
+        LOG.info("Removing {}, subscribing to {}", dest2, dest1);
+
+        // Create two destinations
+        broker.addDestination(brokerService.getAdminConnectionContext(), dest1, false);
+        broker.addDestination(brokerService.getAdminConnectionContext(), dest2, false);
+
+        // remove destination2
+        broker.removeDestination(brokerService.getAdminConnectionContext(), dest2, 100);
+
+        // try and create a consumer on dest1, before AMQ-5875 this
+        //would cause an IllegalStateException for Topics
+        createConsumer(dest1);
+        Collection<File> storeFiles = FileUtils.listFiles(storeDir, new WildcardFileFilter("db*"),
getStoreFileFilter());
+        assertTrue("Store index should still exist", storeFiles.size() >= 1);
+    }
+
+
+    @Test
+    public void testStoreCleanupDeleteDest1First() throws Exception {
+        LOG.info("Creating {} first, {} second", dest1, dest2);
+        LOG.info("Deleting {} first, {} second", dest1, dest2);
+
+        // Create two destinations
+        broker.addDestination(brokerService.getAdminConnectionContext(), dest1, false);
+        broker.addDestination(brokerService.getAdminConnectionContext(), dest2, false);
+
+        // remove both destinations
+        broker.removeDestination(brokerService.getAdminConnectionContext(), dest1, 100);
+        broker.removeDestination(brokerService.getAdminConnectionContext(), dest2, 100);
+
+        //Assert that with no more destinations attached to a store that it has been cleaned
up
+        Collection<File> storeFiles = FileUtils.listFiles(storeDir, new WildcardFileFilter("db*"),
getStoreFileFilter());
+        assertEquals("Store files should be deleted", 0, storeFiles.size());
+
+    }
+
+    @Test
+    public void testStoreCleanupDeleteDest2First() throws Exception {
+        LOG.info("Creating {} first, {} second", dest1, dest2);
+        LOG.info("Deleting {} first, {} second", dest2, dest1);
+
+        // Create two destinations
+        broker.addDestination(brokerService.getAdminConnectionContext(), dest1, false);
+        broker.addDestination(brokerService.getAdminConnectionContext(), dest2, false);
+
+        // remove both destinations
+        broker.removeDestination(brokerService.getAdminConnectionContext(), dest2, 100);
+        broker.removeDestination(brokerService.getAdminConnectionContext(), dest1, 100);
+
+        //Assert that with no more destinations attached to a store that it has been cleaned
up
+        Collection<File> storeFiles = FileUtils.listFiles(storeDir, new WildcardFileFilter("db*"),
getStoreFileFilter());
+        assertEquals("Store files should be deleted", 0, storeFiles.size());
+
+    }
+
+
+    protected abstract void createConsumer(ActiveMQDestination dest) throws JMSException;
+
+    protected abstract WildcardFileFilter getStoreFileFilter();
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/2117768e/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBQueueDeletionTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBQueueDeletionTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBQueueDeletionTest.java
new file mode 100644
index 0000000..f4499f3
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBQueueDeletionTest.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * AMQ-5875
+ *
+ * This test shows that when multiple destinations share a single KahaDB
+ * instance when using mKahaDB, that the deletion of one Queue will not cause
+ * the store to be deleted if another destination is still attached.  This
+ * issue was related to Topics but this test makes sure Queues work as well.
+ *
+ * */
+@RunWith(Parameterized.class)
+public class MultiKahaDBQueueDeletionTest extends AbstractMultiKahaDBDeletionTest {
+
+    protected static final Logger LOG = LoggerFactory
+            .getLogger(MultiKahaDBTopicDeletionTest.class);
+
+    protected static ActiveMQQueue QUEUE1 = new ActiveMQQueue("test.>");
+    protected static ActiveMQQueue QUEUE2 = new ActiveMQQueue("test.t.queue");
+
+    @Parameters
+    public static Collection<Object[]> data() {
+
+        //Test with queues created in different orders
+        return Arrays.asList(new Object[][] {
+                {QUEUE1, QUEUE2},
+                {QUEUE2, QUEUE1}
+        });
+    }
+
+    public MultiKahaDBQueueDeletionTest(ActiveMQQueue dest1, ActiveMQQueue dest2) {
+        super(dest1, dest2);
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.activemq.store.kahadb.AbstractMultiKahaDBDeletionTest#createConsumer(org.apache.activemq.command.ActiveMQDestination)
+     */
+    @Override
+    protected void createConsumer(ActiveMQDestination dest) throws JMSException {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
+                brokerConnectURI);
+        Connection connection = factory.createConnection();
+        connection.setClientID("client1");
+        connection.start();
+        Session session = connection.createSession(false,
+                Session.AUTO_ACKNOWLEDGE);
+        session.createConsumer(dest);
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.activemq.store.kahadb.AbstractMultiKahaDBDeletionTest#getStoreFileFilter()
+     */
+    @Override
+    protected WildcardFileFilter getStoreFileFilter() {
+        return new WildcardFileFilter("queue*");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/2117768e/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBTopicDeletionTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBTopicDeletionTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBTopicDeletionTest.java
index 4380f5a..02f20c5 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBTopicDeletionTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBTopicDeletionTest.java
@@ -16,32 +16,18 @@
  */
 package org.apache.activemq.store.kahadb;
 
-import static org.junit.Assert.assertEquals;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.List;
 
 import javax.jms.Connection;
 import javax.jms.JMSException;
 import javax.jms.Session;
+import javax.jms.Topic;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.Broker;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.filefilter.WildcardFileFilter;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
@@ -58,21 +44,13 @@ import org.slf4j.LoggerFactory;
  *
  * */
 @RunWith(Parameterized.class)
-public class MultiKahaDBTopicDeletionTest {
+public class MultiKahaDBTopicDeletionTest extends AbstractMultiKahaDBDeletionTest {
     protected static final Logger LOG = LoggerFactory
             .getLogger(MultiKahaDBTopicDeletionTest.class);
 
-    protected BrokerService brokerService;
-    protected Broker broker;
-    protected URI brokerConnectURI;
-    protected File storeDir;
-    protected ActiveMQTopic topic1;
-    protected ActiveMQTopic topic2;
-
     protected static ActiveMQTopic TOPIC1 = new ActiveMQTopic("test.>");
     protected static ActiveMQTopic TOPIC2 = new ActiveMQTopic("test.t.topic");
 
-
     @Parameters
     public static Collection<Object[]> data() {
 
@@ -83,144 +61,13 @@ public class MultiKahaDBTopicDeletionTest {
         });
     }
 
-    public MultiKahaDBTopicDeletionTest(ActiveMQTopic topic1, ActiveMQTopic topic2) {
-        this.topic1 = topic1;
-        this.topic2 = topic2;
-    }
-
-    @Rule
-    public TemporaryFolder tempTestDir = new TemporaryFolder();
-
-    @Before
-    public void startBroker() throws Exception {
-        setUpBroker(true);
-    }
-
-    protected void setUpBroker(boolean clearDataDir) throws Exception {
-        brokerService = new BrokerService();
-        this.initPersistence(brokerService);
-        // set up a transport
-        TransportConnector connector = brokerService
-                .addConnector(new TransportConnector());
-        connector.setUri(new URI("tcp://0.0.0.0:0"));
-        connector.setName("tcp");
-
-        brokerService.start();
-        brokerService.waitUntilStarted();
-        brokerConnectURI = brokerService.getConnectorByName("tcp")
-                .getConnectUri();
-        broker = brokerService.getBroker();
-    }
-
-    @After
-    public void stopBroker() throws Exception {
-        brokerService.stop();
-        brokerService.waitUntilStopped();
-    }
-
-    protected void initPersistence(BrokerService brokerService)
-            throws IOException {
-        storeDir = tempTestDir.getRoot();
-        brokerService.setPersistent(true);
-
-        // setup multi-kaha adapter
-        MultiKahaDBPersistenceAdapter persistenceAdapter = new MultiKahaDBPersistenceAdapter();
-        persistenceAdapter.setDirectory(storeDir);
-
-        KahaDBPersistenceAdapter kahaStore = new KahaDBPersistenceAdapter();
-        kahaStore.setJournalMaxFileLength(1024 * 512);
-
-        // set up a store per destination
-        FilteredKahaDBPersistenceAdapter filtered = new FilteredKahaDBPersistenceAdapter();
-        filtered.setPersistenceAdapter(kahaStore);
-        filtered.setPerDestination(true);
-        List<FilteredKahaDBPersistenceAdapter> stores = new ArrayList<>();
-        stores.add(filtered);
-
-        persistenceAdapter.setFilteredPersistenceAdapters(stores);
-        brokerService.setPersistenceAdapter(persistenceAdapter);
-    }
-
-    /**
-     * Test that a topic can be deleted and the other topic can still be subscribed to
-     * @throws Exception
-     */
-    @Test
-    public void testTopic1Deletion() throws Exception {
-        LOG.info("Creating {} first, {} second", topic1, topic2);
-        LOG.info("Removing {}, subscribing to {}", topic1, topic2);
-
-        // Create two topics
-        broker.addDestination(brokerService.getAdminConnectionContext(), topic1, false);
-        broker.addDestination(brokerService.getAdminConnectionContext(), topic2, false);
-
-        // remove topic2
-        broker.removeDestination(brokerService.getAdminConnectionContext(), topic1, 100);
-
-        // try and create a subscription on topic2, before AMQ-5875 this
-        //would cause an IllegalStateException
-        createSubscriber(topic2);
-    }
-
-
-    @Test
-    public void testTopic2Deletion() throws Exception {
-        LOG.info("Creating {} first, {} second", topic1, topic2);
-        LOG.info("Removing {}, subscribing to {}", topic2, topic1);
-
-        // Create two topics
-        broker.addDestination(brokerService.getAdminConnectionContext(), topic1, false);
-        broker.addDestination(brokerService.getAdminConnectionContext(), topic2, false);
-
-        // remove topic2
-        broker.removeDestination(brokerService.getAdminConnectionContext(), topic2, 100);
-
-        // try and create a subscription on topic1, before AMQ-5875 this
-        //would cause an IllegalStateException
-        createSubscriber(topic1);
-    }
-
-
-    @Test
-    public void testStoreCleanupDeleteTopic1First() throws Exception {
-        LOG.info("Creating {} first, {} second", topic1, topic2);
-        LOG.info("Deleting {} first, {} second", topic1, topic2);
-
-        // Create two topics
-        broker.addDestination(brokerService.getAdminConnectionContext(), topic1, false);
-        broker.addDestination(brokerService.getAdminConnectionContext(), topic2, false);
-
-        // remove both topics
-        broker.removeDestination(brokerService.getAdminConnectionContext(), topic1, 100);
-        broker.removeDestination(brokerService.getAdminConnectionContext(), topic2, 100);
-
-        //Assert that with no more destinations attached to a store that it has been cleaned
up
-        Collection<File> storeFiles = FileUtils.listFiles(storeDir, new WildcardFileFilter("db*"),
new WildcardFileFilter("topic*"));
-        assertEquals("Store files should be deleted", 0, storeFiles.size());
-
-    }
-
-    @Test
-    public void testStoreCleanupDeleteTopic2First() throws Exception {
-        LOG.info("Creating {} first, {} second", topic1, topic2);
-        LOG.info("Deleting {} first, {} second", topic2, topic1);
-
-        // Create two topics
-        broker.addDestination(brokerService.getAdminConnectionContext(), topic1, false);
-        broker.addDestination(brokerService.getAdminConnectionContext(), topic2, false);
-
-        // remove both topics
-        broker.removeDestination(brokerService.getAdminConnectionContext(), topic2, 100);
-        broker.removeDestination(brokerService.getAdminConnectionContext(), topic1, 100);
-
-        //Assert that with no more destinations attached to a store that it has been cleaned
up
-        Collection<File> storeFiles = FileUtils.listFiles(storeDir, new WildcardFileFilter("db*"),
new WildcardFileFilter("topic*"));
-        assertEquals("Store files should be deleted", 0, storeFiles.size());
-
+    public MultiKahaDBTopicDeletionTest(ActiveMQTopic dest1,
+            ActiveMQTopic dest2) {
+        super(dest1, dest2);
     }
 
-
-    protected void createSubscriber(ActiveMQTopic topic) throws JMSException {
+    @Override
+    protected void createConsumer(ActiveMQDestination dest) throws JMSException {
         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
                 brokerConnectURI);
         Connection connection = factory.createConnection();
@@ -228,7 +75,12 @@ public class MultiKahaDBTopicDeletionTest {
         connection.start();
         Session session = connection.createSession(false,
                 Session.AUTO_ACKNOWLEDGE);
-        session.createDurableSubscriber(topic, "sub1");
+        session.createDurableSubscriber((Topic) dest, "sub1");
+    }
+
+    @Override
+    protected WildcardFileFilter getStoreFileFilter() {
+        return new WildcardFileFilter("topic*");
     }
 
 }


Mime
View raw message