Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 06881200C33 for ; Fri, 24 Feb 2017 20:24:29 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 05377160B69; Fri, 24 Feb 2017 19:24:29 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 04318160B62 for ; Fri, 24 Feb 2017 20:24:27 +0100 (CET) Received: (qmail 68040 invoked by uid 500); 24 Feb 2017 19:24:27 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 68019 invoked by uid 99); 24 Feb 2017 19:24:27 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 24 Feb 2017 19:24:27 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0D4D7DFC1C; Fri, 24 Feb 2017 19:24:27 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: cshannon@apache.org To: commits@activemq.apache.org Date: Fri, 24 Feb 2017 19:24:27 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] activemq git commit: [AMQ-6567] ensure gc file removal/move is completed after index updates to avoid dangling index referenced on partial failure. Fix and test archived-at: Fri, 24 Feb 2017 19:24:29 -0000 Repository: activemq Updated Branches: refs/heads/activemq-5.14.x 0ee942058 -> 22d5b51a0 [AMQ-6567] ensure gc file removal/move is completed after index updates to avoid dangling index referenced on partial failure. Fix and test (cherry picked from commit 20522394cc747e64bd9f87e2e0b64d886c4dec62) Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/1a67318f Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/1a67318f Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/1a67318f Branch: refs/heads/activemq-5.14.x Commit: 1a67318fe97b1339ea1fbbd74cc7232c46e8f27c Parents: 0ee9420 Author: gtully Authored: Tue Jan 17 14:05:04 2017 +0000 Committer: Christopher L. Shannon (cshannon) Committed: Fri Feb 24 14:17:34 2017 -0500 ---------------------------------------------------------------------- .../activemq/store/kahadb/MessageDatabase.java | 17 +- .../store/kahadb/JournalArchiveTest.java | 221 +++++++++++++++++++ 2 files changed, 230 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/1a67318f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 4c6ec36..2db07f1 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -1686,12 +1686,14 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe try { this.indexLock.writeLock().lock(); try { - pageFile.tx().execute(new Transaction.Closure() { + Set filesToGc = pageFile.tx().execute(new Transaction.CallableClosure, IOException>() { @Override - public void execute(Transaction tx) throws IOException { - checkpointUpdate(tx, cleanup); + public Set execute(Transaction tx) throws IOException { + return checkpointUpdate(tx, cleanup); } }); + // after the index update such that partial removal does not leave dangling references in the index. + journal.removeDataFiles(filesToGc); } finally { this.indexLock.writeLock().unlock(); } @@ -1705,7 +1707,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe * @param tx * @throws IOException */ - void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException { + Set checkpointUpdate(Transaction tx, boolean cleanup) throws IOException { MDC.put("activemq.persistenceDir", getDirectory().getName()); LOG.debug("Checkpoint started."); @@ -1720,10 +1722,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe tx.store(metadata.page, metadataMarshaller, true); pageFile.flush(); + final TreeSet gcCandidateSet = new TreeSet<>(); if (cleanup) { final TreeSet completeFileSet = new TreeSet<>(journal.getFileMap().keySet()); - final TreeSet gcCandidateSet = new TreeSet<>(completeFileSet); + gcCandidateSet.addAll(completeFileSet); if (LOG.isTraceEnabled()) { LOG.trace("Last update: " + lastUpdate + ", full gc candidates set: " + gcCandidateSet); @@ -1895,7 +1898,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe if (!gcCandidateSet.isEmpty()) { LOG.debug("Cleanup removing the data files: {}", gcCandidateSet); - journal.removeDataFiles(gcCandidateSet); for (Integer candidate : gcCandidateSet) { for (Set ackFiles : metadata.ackMessageFileMap.values()) { ackMessageFileMapMod |= ackFiles.remove(candidate); @@ -1941,6 +1943,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe MDC.remove("activemq.persistenceDir"); LOG.debug("Checkpoint done."); + return gcCandidateSet; } private final class AckCompactionRunner implements Runnable { @@ -2701,8 +2704,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe /** * Locate the storeMessageSize counter for this KahaDestination - * @param kahaDestination - * @return */ protected MessageStoreStatistics getStoreStats(String kahaDestKey) { MessageStoreStatistics storeStats = null; http://git-wip-us.apache.org/repos/asf/activemq/blob/1a67318f/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalArchiveTest.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalArchiveTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalArchiveTest.java new file mode 100644 index 0000000..35750a6 --- /dev/null +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalArchiveTest.java @@ -0,0 +1,221 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.kahadb; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.store.kahadb.disk.journal.DataFile; +import org.junit.After; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Session; +import java.io.File; +import java.io.IOException; +import java.security.Permission; +import java.util.Collection; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.activemq.store.kahadb.JournalCorruptionEofIndexRecoveryTest.drain; +import static org.apache.activemq.store.kahadb.disk.journal.Journal.DEFAULT_ARCHIVE_DIRECTORY; +import static org.junit.Assert.*; + +public class JournalArchiveTest { + + private static final Logger LOG = LoggerFactory.getLogger(JournalArchiveTest.class); + + private final String KAHADB_DIRECTORY = "target/activemq-data/"; + private final String payload = new String(new byte[1024]); + + private BrokerService broker = null; + private final Destination destination = new ActiveMQQueue("Test"); + private KahaDBPersistenceAdapter adapter; + + protected void startBroker() throws Exception { + doStartBroker(true); + } + + protected void restartBroker() throws Exception { + if (broker != null) { + broker.stop(); + broker.waitUntilStopped(); + } + + doStartBroker(false); + } + + private void doStartBroker(boolean delete) throws Exception { + doCreateBroker(delete); + LOG.info("Starting broker.."); + broker.start(); + } + + private void doCreateBroker(boolean delete) throws Exception { + + broker = new BrokerService(); + broker.setDeleteAllMessagesOnStartup(delete); + broker.setPersistent(true); + broker.setUseJmx(true); + broker.setDataDirectory(KAHADB_DIRECTORY); + + PolicyMap policyMap = new PolicyMap(); + PolicyEntry policyEntry = new PolicyEntry(); + policyEntry.setUseCache(false); + policyMap.setDefaultEntry(policyEntry); + broker.setDestinationPolicy(policyMap); + + configurePersistence(broker); + } + + protected void configurePersistence(BrokerService brokerService) throws Exception { + adapter = (KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter(); + + // ensure there are a bunch of data files but multiple entries in each + adapter.setJournalMaxFileLength(1024 * 20); + + // speed up the test case, checkpoint an cleanup early and often + adapter.setCheckpointInterval(2000); + adapter.setCleanupInterval(2000); + + adapter.setCheckForCorruptJournalFiles(true); + + adapter.setArchiveDataLogs(true); + } + + @After + public void tearDown() throws Exception { + if (broker != null) { + broker.stop(); + broker.waitUntilStopped(); + } + } + + + @Test + public void testRecoveryOnArchiveFailure() throws Exception { + final AtomicInteger atomicInteger = new AtomicInteger(); + + System.setSecurityManager(new SecurityManager() { + public void checkPermission(Permission perm) {} + public void checkPermission(Permission perm, Object context) {} + + public void checkWrite(String file) { + if (file.contains(DEFAULT_ARCHIVE_DIRECTORY) && atomicInteger.incrementAndGet() > 4) { + throw new SecurityException("No Perms to write to archive times:" + atomicInteger.get()); + } + } + }); + startBroker(); + + int sent = produceMessagesToConsumeMultipleDataFiles(50); + + int numFilesAfterSend = getNumberOfJournalFiles(); + LOG.info("Num journal files: " + numFilesAfterSend); + + assertTrue("more than x files: " + numFilesAfterSend, numFilesAfterSend > 4); + + final CountDownLatch gotShutdown = new CountDownLatch(1); + broker.addShutdownHook(new Runnable() { + @Override + public void run() { + gotShutdown.countDown(); + } + }); + + int received = tryConsume(destination, sent); + assertEquals("all message received", sent, received); + assertTrue("broker got shutdown on page in error", gotShutdown.await(10, TimeUnit.SECONDS)); + + // no restrictions + System.setSecurityManager(null); + + int numFilesAfterRestart = 0; + try { + // ensure we can restart after failure to archive + doStartBroker(false); + numFilesAfterRestart = getNumberOfJournalFiles(); + LOG.info("Num journal files before gc: " + numFilesAfterRestart); + + // force gc + ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().checkpoint(true); + + } catch (Exception error) { + LOG.error("Failed to restart!", error); + fail("Failed to restart after failure to archive"); + } + int numFilesAfterGC = getNumberOfJournalFiles(); + LOG.info("Num journal files after restart nd gc: " + numFilesAfterGC); + assertTrue("Gc has happened", numFilesAfterGC < numFilesAfterRestart); + assertTrue("Gc has worked", numFilesAfterGC < 4); + + File archiveDirectory = ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().getDirectoryArchive(); + assertEquals("verify files in archive dir", numFilesAfterSend, archiveDirectory.listFiles().length); + } + + + private int getNumberOfJournalFiles() throws IOException { + Collection files = ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().values(); + int reality = 0; + for (DataFile file : files) { + if (file != null) { + reality++; + } + } + return reality; + } + + private int produceMessages(Destination destination, int numToSend) throws Exception { + int sent = 0; + Connection connection = new ActiveMQConnectionFactory(broker.getVmConnectorURI()).createConnection(); + connection.start(); + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(destination); + for (int i = 0; i < numToSend; i++) { + producer.send(createMessage(session, i)); + sent++; + } + } finally { + connection.close(); + } + + return sent; + } + + private int tryConsume(Destination destination, int numToGet) throws Exception { + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(broker.getVmConnectorURI()); + return drain(cf, destination, numToGet); + } + + private int produceMessagesToConsumeMultipleDataFiles(int numToSend) throws Exception { + return produceMessages(destination, numToSend); + } + + private Message createMessage(Session session, int i) throws Exception { + return session.createTextMessage(payload + "::" + i); + } +}