activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject [1/2] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5300 - fix and test that verifies recovery in the absense of an index
Date Wed, 07 Jan 2015 17:11:43 GMT
Repository: activemq
Updated Branches:
  refs/heads/trunk c5cebd5ec -> b54606b12


https://issues.apache.org/jira/browse/AMQ-5300 - fix and test that verifies recovery in the
absense of an index


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/b54606b1
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/b54606b1
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/b54606b1

Branch: refs/heads/trunk
Commit: b54606b12417c371af6589b96b72f7110de26b34
Parents: f1df9f8
Author: gtully <gary.tully@gmail.com>
Authored: Wed Jan 7 17:10:10 2015 +0000
Committer: gtully <gary.tully@gmail.com>
Committed: Wed Jan 7 17:11:09 2015 +0000

----------------------------------------------------------------------
 .../apache/activemq/leveldb/LevelDBClient.scala |  12 +-
 .../activemq/leveldb/test/IndexRebuildTest.java | 128 +++++++++++++++++++
 .../leveldb/test/ReplicationTestSupport.java    |  16 ++-
 3 files changed, 151 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/b54606b1/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
index 64bbcee..db6d653 100755
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
@@ -559,7 +559,17 @@ class LevelDBClient(store: LevelDBStore) {
     might_fail {
       log.open()
     }
-    replay_from(lastIndexSnapshotPos, log.appender_limit)
+
+    var startPosition = lastIndexSnapshotPos;
+    // if we cannot locate a log for a snapshot, replay from
+    // first entry of first available log
+    if (log.log_info(startPosition).isEmpty) {
+        if (!log.log_infos.isEmpty) {
+          startPosition = log.log_infos.firstKey();
+        }
+    }
+
+    replay_from(startPosition, log.appender_limit)
     replay_write_batch = null;
   }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/b54606b1/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/IndexRebuildTest.java
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/IndexRebuildTest.java
b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/IndexRebuildTest.java
new file mode 100644
index 0000000..a9be570
--- /dev/null
+++ b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/IndexRebuildTest.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.leveldb.test;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.util.ArrayList;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.leveldb.LevelDBStore;
+import org.apache.activemq.leveldb.LevelDBStoreView;
+import org.apache.activemq.leveldb.util.FileSupport;
+import org.apache.activemq.store.MessageStore;
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import static org.apache.activemq.leveldb.test.ReplicationTestSupport.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class IndexRebuildTest {
+    protected static final Logger LOG = LoggerFactory.getLogger(IndexRebuildTest.class);
+    final int max = 30;
+    final int toLeave = 5;
+    ArrayList<LevelDBStore> stores = new ArrayList<LevelDBStore>();
+
+    @Test(timeout = 1000 * 60 * 10)
+    public void testRebuildIndex() throws Exception {
+
+        File masterDir = new File("target/activemq-data/leveldb-rebuild");
+        FileSupport.toRichFile(masterDir).recursiveDelete();
+
+        final LevelDBStore store = new LevelDBStore();
+        store.setDirectory(masterDir);
+        store.setLogDirectory(masterDir);
+
+        store.setLogSize(1024 * 10);
+        store.start();
+        stores.add(store);
+
+        ArrayList<MessageId> inserts = new ArrayList<MessageId>();
+        MessageStore ms = store.createQueueMessageStore(new ActiveMQQueue("TEST"));
+        for (int i = 0; i < max; i++) {
+            inserts.add(addMessage(ms, "m" + i).getMessageId());
+        }
+        int logFileCount = countLogFiles(store);
+        assertTrue("more than one journal file", logFileCount > 1);
+
+        for (MessageId id : inserts.subList(0, inserts.size() - toLeave)) {
+            removeMessage(ms, id);
+        }
+
+        LevelDBStoreView view = new LevelDBStoreView(store);
+        view.compact();
+
+        int reducedLogFileCount = countLogFiles(store);
+        assertTrue("log files deleted", logFileCount > reducedLogFileCount);
+
+        store.stop();
+
+        deleteTheIndex(store);
+
+        assertEquals("log files remain", reducedLogFileCount, countLogFiles(store));
+
+        // restart, recover and verify message read
+        store.start();
+        ms = store.createQueueMessageStore(new ActiveMQQueue("TEST"));
+
+        assertEquals(toLeave + " messages remain", toLeave, getMessages(ms).size());
+    }
+
+    private void deleteTheIndex(LevelDBStore store) throws IOException {
+        for (String index : store.getLogDirectory().list(new FilenameFilter() {
+            @Override
+            public boolean accept(File dir, String name) {
+                LOG.info("dir:" + dir + ", name: " + name);
+                return (name != null && name.endsWith(".index"));
+            }
+        })) {
+
+            File file = new File(store.getLogDirectory().getAbsoluteFile(), index);
+            LOG.info("Deleting index directory:" + file);
+            FileUtils.deleteDirectory(file);
+        }
+
+    }
+
+    private int countLogFiles(LevelDBStore store) {
+        return store.getLogDirectory().list(new FilenameFilter() {
+            @Override
+            public boolean accept(File dir, String name) {
+                LOG.info("dir:" + dir + ", name: " + name);
+                return (name != null && name.endsWith(".log"));
+            }
+        }).length;
+    }
+
+    @After
+    public void stop() throws Exception {
+        for (LevelDBStore store : stores) {
+            if (store.isStarted()) {
+                store.stop();
+            }
+            FileUtils.deleteDirectory(store.directory());
+        }
+        stores.clear();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/b54606b1/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicationTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicationTestSupport.java
b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicationTestSupport.java
index 181d11d..9ad57db 100644
--- a/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicationTestSupport.java
+++ b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicationTestSupport.java
@@ -16,17 +16,17 @@
  */
 package org.apache.activemq.leveldb.test;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import javax.jms.JMSException;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.MessageStore;
 
-import javax.jms.JMSException;
-import java.io.IOException;
-import java.util.ArrayList;
-
 /**
  */
 public class ReplicationTestSupport {
@@ -60,6 +60,14 @@ public class ReplicationTestSupport {
         return message;
     }
 
+    static public void removeMessage(MessageStore ms, MessageId messageId) throws JMSException,
IOException {
+        MessageAck ack = new MessageAck();
+        ack.setAckType(MessageAck.INDIVIDUAL_ACK_TYPE);
+        ack.setFirstMessageId(messageId);
+        ack.setLastMessageId(messageId);
+        ms.removeMessage(new ConnectionContext(), ack);
+    }
+
     static public ArrayList<String> getMessages(MessageStore ms) throws Exception {
         final ArrayList<String> rc = new ArrayList<String>();
         ms.recover(new MessageRecoveryListener() {


Mime
View raw message