activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cshan...@apache.org
Subject [1/2] activemq git commit: [AMQ-6567] ensure gc file removal/move is completed after index updates to avoid dangling index referenced on partial failure. Fix and test
Date Fri, 24 Feb 2017 19:24:27 GMT
Repository: activemq
Updated Branches:
  refs/heads/activemq-5.14.x 0ee942058 -> 22d5b51a0


[AMQ-6567] ensure gc file removal/move is completed after index updates to avoid dangling
index referenced on partial failure. Fix and test

(cherry picked from commit 20522394cc747e64bd9f87e2e0b64d886c4dec62)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/1a67318f
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/1a67318f
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/1a67318f

Branch: refs/heads/activemq-5.14.x
Commit: 1a67318fe97b1339ea1fbbd74cc7232c46e8f27c
Parents: 0ee9420
Author: gtully <gary.tully@gmail.com>
Authored: Tue Jan 17 14:05:04 2017 +0000
Committer: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Committed: Fri Feb 24 14:17:34 2017 -0500

----------------------------------------------------------------------
 .../activemq/store/kahadb/MessageDatabase.java  |  17 +-
 .../store/kahadb/JournalArchiveTest.java        | 221 +++++++++++++++++++
 2 files changed, 230 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/1a67318f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 4c6ec36..2db07f1 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -1686,12 +1686,14 @@ public abstract class MessageDatabase extends ServiceSupport implements
BrokerSe
         try {
             this.indexLock.writeLock().lock();
             try {
-                pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                Set<Integer> filesToGc = pageFile.tx().execute(new Transaction.CallableClosure<Set<Integer>,
IOException>() {
                     @Override
-                    public void execute(Transaction tx) throws IOException {
-                        checkpointUpdate(tx, cleanup);
+                    public Set<Integer> execute(Transaction tx) throws IOException
{
+                        return checkpointUpdate(tx, cleanup);
                     }
                 });
+                // after the index update such that partial removal does not leave dangling
references in the index.
+                journal.removeDataFiles(filesToGc);
             } finally {
                 this.indexLock.writeLock().unlock();
             }
@@ -1705,7 +1707,7 @@ public abstract class MessageDatabase extends ServiceSupport implements
BrokerSe
      * @param tx
      * @throws IOException
      */
-    void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException {
+    Set<Integer> checkpointUpdate(Transaction tx, boolean cleanup) throws IOException
{
         MDC.put("activemq.persistenceDir", getDirectory().getName());
         LOG.debug("Checkpoint started.");
 
@@ -1720,10 +1722,11 @@ public abstract class MessageDatabase extends ServiceSupport implements
BrokerSe
         tx.store(metadata.page, metadataMarshaller, true);
         pageFile.flush();
 
+        final TreeSet<Integer> gcCandidateSet = new TreeSet<>();
         if (cleanup) {
 
             final TreeSet<Integer> completeFileSet = new TreeSet<>(journal.getFileMap().keySet());
-            final TreeSet<Integer> gcCandidateSet = new TreeSet<>(completeFileSet);
+            gcCandidateSet.addAll(completeFileSet);
 
             if (LOG.isTraceEnabled()) {
                 LOG.trace("Last update: " + lastUpdate + ", full gc candidates set: " + gcCandidateSet);
@@ -1895,7 +1898,6 @@ public abstract class MessageDatabase extends ServiceSupport implements
BrokerSe
 
             if (!gcCandidateSet.isEmpty()) {
                 LOG.debug("Cleanup removing the data files: {}", gcCandidateSet);
-                journal.removeDataFiles(gcCandidateSet);
                 for (Integer candidate : gcCandidateSet) {
                     for (Set<Integer> ackFiles : metadata.ackMessageFileMap.values())
{
                         ackMessageFileMapMod |= ackFiles.remove(candidate);
@@ -1941,6 +1943,7 @@ public abstract class MessageDatabase extends ServiceSupport implements
BrokerSe
         MDC.remove("activemq.persistenceDir");
 
         LOG.debug("Checkpoint done.");
+        return gcCandidateSet;
     }
 
     private final class AckCompactionRunner implements Runnable {
@@ -2701,8 +2704,6 @@ public abstract class MessageDatabase extends ServiceSupport implements
BrokerSe
 
     /**
      * Locate the storeMessageSize counter for this KahaDestination
-     * @param kahaDestination
-     * @return
      */
     protected MessageStoreStatistics getStoreStats(String kahaDestKey) {
         MessageStoreStatistics storeStats = null;

http://git-wip-us.apache.org/repos/asf/activemq/blob/1a67318f/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalArchiveTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalArchiveTest.java
b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalArchiveTest.java
new file mode 100644
index 0000000..35750a6
--- /dev/null
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalArchiveTest.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.kahadb.disk.journal.DataFile;
+import org.junit.After;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.io.File;
+import java.io.IOException;
+import java.security.Permission;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.activemq.store.kahadb.JournalCorruptionEofIndexRecoveryTest.drain;
+import static org.apache.activemq.store.kahadb.disk.journal.Journal.DEFAULT_ARCHIVE_DIRECTORY;
+import static org.junit.Assert.*;
+
+public class JournalArchiveTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JournalArchiveTest.class);
+
+    private final String KAHADB_DIRECTORY = "target/activemq-data/";
+    private final String payload = new String(new byte[1024]);
+
+    private BrokerService broker = null;
+    private final Destination destination = new ActiveMQQueue("Test");
+    private KahaDBPersistenceAdapter adapter;
+
+    protected void startBroker() throws Exception {
+        doStartBroker(true);
+    }
+
+    protected void restartBroker() throws Exception {
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+
+        doStartBroker(false);
+    }
+
+    private void doStartBroker(boolean delete) throws Exception {
+        doCreateBroker(delete);
+        LOG.info("Starting broker..");
+        broker.start();
+    }
+
+    private void doCreateBroker(boolean delete) throws Exception {
+
+        broker = new BrokerService();
+        broker.setDeleteAllMessagesOnStartup(delete);
+        broker.setPersistent(true);
+        broker.setUseJmx(true);
+        broker.setDataDirectory(KAHADB_DIRECTORY);
+
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry policyEntry = new PolicyEntry();
+        policyEntry.setUseCache(false);
+        policyMap.setDefaultEntry(policyEntry);
+        broker.setDestinationPolicy(policyMap);
+
+        configurePersistence(broker);
+    }
+
+    protected void configurePersistence(BrokerService brokerService) throws Exception {
+        adapter = (KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter();
+
+        // ensure there are a bunch of data files but multiple entries in each
+        adapter.setJournalMaxFileLength(1024 * 20);
+
+        // speed up the test case, checkpoint an cleanup early and often
+        adapter.setCheckpointInterval(2000);
+        adapter.setCleanupInterval(2000);
+
+        adapter.setCheckForCorruptJournalFiles(true);
+
+        adapter.setArchiveDataLogs(true);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+    }
+
+
+    @Test
+    public void testRecoveryOnArchiveFailure() throws Exception {
+        final AtomicInteger atomicInteger = new AtomicInteger();
+
+        System.setSecurityManager(new SecurityManager() {
+            public void checkPermission(Permission perm) {}
+            public void checkPermission(Permission perm, Object context) {}
+
+            public void checkWrite(String file) {
+                if (file.contains(DEFAULT_ARCHIVE_DIRECTORY) && atomicInteger.incrementAndGet()
> 4) {
+                    throw new SecurityException("No Perms to write to archive times:" + atomicInteger.get());
+                }
+            }
+        });
+        startBroker();
+
+        int sent = produceMessagesToConsumeMultipleDataFiles(50);
+
+        int numFilesAfterSend = getNumberOfJournalFiles();
+        LOG.info("Num journal files: " + numFilesAfterSend);
+
+        assertTrue("more than x files: " + numFilesAfterSend, numFilesAfterSend > 4);
+
+        final CountDownLatch gotShutdown = new CountDownLatch(1);
+        broker.addShutdownHook(new Runnable() {
+            @Override
+            public void run() {
+                gotShutdown.countDown();
+            }
+        });
+
+        int received = tryConsume(destination, sent);
+        assertEquals("all message received", sent, received);
+        assertTrue("broker got shutdown on page in error", gotShutdown.await(10, TimeUnit.SECONDS));
+
+        // no restrictions
+        System.setSecurityManager(null);
+
+        int numFilesAfterRestart = 0;
+        try {
+            // ensure we can restart after failure to archive
+            doStartBroker(false);
+            numFilesAfterRestart = getNumberOfJournalFiles();
+            LOG.info("Num journal files before gc: " + numFilesAfterRestart);
+
+            // force gc
+            ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().checkpoint(true);
+
+        } catch (Exception error) {
+            LOG.error("Failed to restart!", error);
+            fail("Failed to restart after failure to archive");
+        }
+        int numFilesAfterGC = getNumberOfJournalFiles();
+        LOG.info("Num journal files after restart nd gc: " + numFilesAfterGC);
+        assertTrue("Gc has happened", numFilesAfterGC < numFilesAfterRestart);
+        assertTrue("Gc has worked", numFilesAfterGC < 4);
+
+        File archiveDirectory = ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().getDirectoryArchive();
+        assertEquals("verify files in archive dir", numFilesAfterSend, archiveDirectory.listFiles().length);
+    }
+
+
+    private int getNumberOfJournalFiles() throws IOException {
+        Collection<DataFile> files = ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().values();
+        int reality = 0;
+        for (DataFile file : files) {
+            if (file != null) {
+                reality++;
+            }
+        }
+        return reality;
+    }
+
+    private int produceMessages(Destination destination, int numToSend) throws Exception
{
+        int sent = 0;
+        Connection connection = new ActiveMQConnectionFactory(broker.getVmConnectorURI()).createConnection();
+        connection.start();
+        try {
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageProducer producer = session.createProducer(destination);
+            for (int i = 0; i < numToSend; i++) {
+                producer.send(createMessage(session, i));
+                sent++;
+            }
+        } finally {
+            connection.close();
+        }
+
+        return sent;
+    }
+
+    private int tryConsume(Destination destination, int numToGet) throws Exception {
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(broker.getVmConnectorURI());
+        return  drain(cf, destination, numToGet);
+    }
+
+    private int produceMessagesToConsumeMultipleDataFiles(int numToSend) throws Exception
{
+        return produceMessages(destination, numToSend);
+    }
+
+    private Message createMessage(Session session, int i) throws Exception {
+        return session.createTextMessage(payload + "::" + i);
+    }
+}


Mime
View raw message