activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject [1/2] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5875
Date Wed, 08 Jul 2015 21:18:27 GMT
Repository: activemq
Updated Branches:
  refs/heads/master c853bcf43 -> b0952d874


https://issues.apache.org/jira/browse/AMQ-5875

Resolves an issue when using mKahaDB that caused a MessageStore
being used by more than one destination to be deleted even though
there was still at least 1 other destination using the store.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/73d1bcd7
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/73d1bcd7
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/73d1bcd7

Branch: refs/heads/master
Commit: 73d1bcd7ac4fe7bd94da9fe178f6f17d30bc41f2
Parents: c853bcf
Author: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Authored: Wed Jul 8 13:02:43 2015 +0000
Committer: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Committed: Wed Jul 8 18:32:15 2015 +0000

----------------------------------------------------------------------
 .../activemq/store/kahadb/KahaDBStore.java      |  18 +-
 .../kahadb/MultiKahaDBPersistenceAdapter.java   |  26 +--
 .../kahadb/MultiKahaDBTopicDeletionTest.java    | 234 +++++++++++++++++++
 3 files changed, 248 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/73d1bcd7/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index 44f93a6..89ee40c 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -1049,23 +1049,9 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter
{
                         for (Iterator<Entry<String, StoredDestination>> iterator
= metadata.destinations.iterator(tx); iterator
                                 .hasNext();) {
                             Entry<String, StoredDestination> entry = iterator.next();
-                            if (!isEmptyTopic(entry, tx)) {
-                                rc.add(convert(entry.getKey()));
-                            }
-                        }
-                    }
-
-                    private boolean isEmptyTopic(Entry<String, StoredDestination> entry,
Transaction tx)
-                            throws IOException {
-                        boolean isEmptyTopic = false;
-                        ActiveMQDestination dest = convert(entry.getKey());
-                        if (dest.isTopic()) {
-                            StoredDestination loadedStore = getStoredDestination(convert(dest),
tx);
-                            if (loadedStore.subscriptionAcks.isEmpty(tx)) {
-                                isEmptyTopic = true;
-                            }
+                            //Removing isEmpty topic check - see AMQ-5875
+                            rc.add(convert(entry.getKey()));
                         }
-                        return isEmptyTopic;
                     }
                 });
             }finally {

http://git-wip-us.apache.org/repos/asf/activemq/blob/73d1bcd7/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
index 286931e..d46108a 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
@@ -277,7 +277,7 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport
implem
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
-        if (adapter instanceof PersistenceAdapter) {
+        if (adapter instanceof PersistenceAdapter && adapter.getDestinations().isEmpty())
{
             adapter.removeQueueMessageStore(destination);
             removeMessageStore(adapter, destination);
             destinationMap.removeAll(destination);
@@ -292,7 +292,7 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport
implem
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
-        if (adapter instanceof PersistenceAdapter) {
+        if (adapter instanceof PersistenceAdapter && adapter.getDestinations().isEmpty())
{
             adapter.removeTopicMessageStore(destination);
             removeMessageStore(adapter, destination);
             destinationMap.removeAll(destination);
@@ -300,18 +300,16 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport
implem
     }
 
     private void removeMessageStore(PersistenceAdapter adapter, ActiveMQDestination destination)
{
-        if (adapter.getDestinations().isEmpty()) {
-            stopAdapter(adapter, destination.toString());
-            File adapterDir = adapter.getDirectory();
-            if (adapterDir != null) {
-                if (IOHelper.deleteFile(adapterDir)) {
-                    if (LOG.isTraceEnabled()) {
-                        LOG.info("deleted per destination adapter directory for: " + destination);
-                    }
-                } else {
-                    if (LOG.isTraceEnabled()) {
-                        LOG.info("failed to deleted per destination adapter directory for:
" + destination);
-                    }
+        stopAdapter(adapter, destination.toString());
+        File adapterDir = adapter.getDirectory();
+        if (adapterDir != null) {
+            if (IOHelper.deleteFile(adapterDir)) {
+                if (LOG.isTraceEnabled()) {
+                    LOG.info("deleted per destination adapter directory for: " + destination);
+                }
+            } else {
+                if (LOG.isTraceEnabled()) {
+                    LOG.info("failed to deleted per destination adapter directory for: "
+ destination);
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/73d1bcd7/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBTopicDeletionTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBTopicDeletionTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBTopicDeletionTest.java
new file mode 100644
index 0000000..4380f5a
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBTopicDeletionTest.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * AMQ-5875
+ *
+ * This test shows that when multiple destinations share a single KahaDB
+ * instance when using mKahaDB, that the deletion of one Topic will no longer
+ * cause an IllegalStateException and the store will be properly kept around
+ * until all destinations associated with the store are gone.
+ *
+ * */
+@RunWith(Parameterized.class)
+public class MultiKahaDBTopicDeletionTest {
+    protected static final Logger LOG = LoggerFactory
+            .getLogger(MultiKahaDBTopicDeletionTest.class);
+
+    protected BrokerService brokerService;
+    protected Broker broker;
+    protected URI brokerConnectURI;
+    protected File storeDir;
+    protected ActiveMQTopic topic1;
+    protected ActiveMQTopic topic2;
+
+    protected static ActiveMQTopic TOPIC1 = new ActiveMQTopic("test.>");
+    protected static ActiveMQTopic TOPIC2 = new ActiveMQTopic("test.t.topic");
+
+
+    @Parameters
+    public static Collection<Object[]> data() {
+
+        //Test with topics created in different orders
+        return Arrays.asList(new Object[][] {
+                {TOPIC1, TOPIC2},
+                {TOPIC2, TOPIC1}
+        });
+    }
+
+    public MultiKahaDBTopicDeletionTest(ActiveMQTopic topic1, ActiveMQTopic topic2) {
+        this.topic1 = topic1;
+        this.topic2 = topic2;
+    }
+
+    @Rule
+    public TemporaryFolder tempTestDir = new TemporaryFolder();
+
+    @Before
+    public void startBroker() throws Exception {
+        setUpBroker(true);
+    }
+
+    protected void setUpBroker(boolean clearDataDir) throws Exception {
+        brokerService = new BrokerService();
+        this.initPersistence(brokerService);
+        // set up a transport
+        TransportConnector connector = brokerService
+                .addConnector(new TransportConnector());
+        connector.setUri(new URI("tcp://0.0.0.0:0"));
+        connector.setName("tcp");
+
+        brokerService.start();
+        brokerService.waitUntilStarted();
+        brokerConnectURI = brokerService.getConnectorByName("tcp")
+                .getConnectUri();
+        broker = brokerService.getBroker();
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        brokerService.stop();
+        brokerService.waitUntilStopped();
+    }
+
+    protected void initPersistence(BrokerService brokerService)
+            throws IOException {
+        storeDir = tempTestDir.getRoot();
+        brokerService.setPersistent(true);
+
+        // setup multi-kaha adapter
+        MultiKahaDBPersistenceAdapter persistenceAdapter = new MultiKahaDBPersistenceAdapter();
+        persistenceAdapter.setDirectory(storeDir);
+
+        KahaDBPersistenceAdapter kahaStore = new KahaDBPersistenceAdapter();
+        kahaStore.setJournalMaxFileLength(1024 * 512);
+
+        // set up a store per destination
+        FilteredKahaDBPersistenceAdapter filtered = new FilteredKahaDBPersistenceAdapter();
+        filtered.setPersistenceAdapter(kahaStore);
+        filtered.setPerDestination(true);
+        List<FilteredKahaDBPersistenceAdapter> stores = new ArrayList<>();
+        stores.add(filtered);
+
+        persistenceAdapter.setFilteredPersistenceAdapters(stores);
+        brokerService.setPersistenceAdapter(persistenceAdapter);
+    }
+
+    /**
+     * Test that a topic can be deleted and the other topic can still be subscribed to
+     * @throws Exception
+     */
+    @Test
+    public void testTopic1Deletion() throws Exception {
+        LOG.info("Creating {} first, {} second", topic1, topic2);
+        LOG.info("Removing {}, subscribing to {}", topic1, topic2);
+
+        // Create two topics
+        broker.addDestination(brokerService.getAdminConnectionContext(), topic1, false);
+        broker.addDestination(brokerService.getAdminConnectionContext(), topic2, false);
+
+        // remove topic2
+        broker.removeDestination(brokerService.getAdminConnectionContext(), topic1, 100);
+
+        // try and create a subscription on topic2, before AMQ-5875 this
+        //would cause an IllegalStateException
+        createSubscriber(topic2);
+    }
+
+
+    @Test
+    public void testTopic2Deletion() throws Exception {
+        LOG.info("Creating {} first, {} second", topic1, topic2);
+        LOG.info("Removing {}, subscribing to {}", topic2, topic1);
+
+        // Create two topics
+        broker.addDestination(brokerService.getAdminConnectionContext(), topic1, false);
+        broker.addDestination(brokerService.getAdminConnectionContext(), topic2, false);
+
+        // remove topic2
+        broker.removeDestination(brokerService.getAdminConnectionContext(), topic2, 100);
+
+        // try and create a subscription on topic1, before AMQ-5875 this
+        //would cause an IllegalStateException
+        createSubscriber(topic1);
+    }
+
+
+    @Test
+    public void testStoreCleanupDeleteTopic1First() throws Exception {
+        LOG.info("Creating {} first, {} second", topic1, topic2);
+        LOG.info("Deleting {} first, {} second", topic1, topic2);
+
+        // Create two topics
+        broker.addDestination(brokerService.getAdminConnectionContext(), topic1, false);
+        broker.addDestination(brokerService.getAdminConnectionContext(), topic2, false);
+
+        // remove both topics
+        broker.removeDestination(brokerService.getAdminConnectionContext(), topic1, 100);
+        broker.removeDestination(brokerService.getAdminConnectionContext(), topic2, 100);
+
+        //Assert that with no more destinations attached to a store that it has been cleaned
up
+        Collection<File> storeFiles = FileUtils.listFiles(storeDir, new WildcardFileFilter("db*"),
new WildcardFileFilter("topic*"));
+        assertEquals("Store files should be deleted", 0, storeFiles.size());
+
+    }
+
+    @Test
+    public void testStoreCleanupDeleteTopic2First() throws Exception {
+        LOG.info("Creating {} first, {} second", topic1, topic2);
+        LOG.info("Deleting {} first, {} second", topic2, topic1);
+
+        // Create two topics
+        broker.addDestination(brokerService.getAdminConnectionContext(), topic1, false);
+        broker.addDestination(brokerService.getAdminConnectionContext(), topic2, false);
+
+        // remove both topics
+        broker.removeDestination(brokerService.getAdminConnectionContext(), topic2, 100);
+        broker.removeDestination(brokerService.getAdminConnectionContext(), topic1, 100);
+
+        //Assert that with no more destinations attached to a store that it has been cleaned
up
+        Collection<File> storeFiles = FileUtils.listFiles(storeDir, new WildcardFileFilter("db*"),
new WildcardFileFilter("topic*"));
+        assertEquals("Store files should be deleted", 0, storeFiles.size());
+
+    }
+
+
+    protected void createSubscriber(ActiveMQTopic topic) throws JMSException {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
+                brokerConnectURI);
+        Connection connection = factory.createConnection();
+        connection.setClientID("client1");
+        connection.start();
+        Session session = connection.createSession(false,
+                Session.AUTO_ACKNOWLEDGE);
+        session.createDurableSubscriber(topic, "sub1");
+    }
+
+}


Mime
View raw message