activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject activemq git commit: [AMQ-6831, AMQ-6771] fix up recovery check to ensure full batch is available in memory, regression from AMQ-6771
Date Tue, 10 Oct 2017 16:33:35 GMT
Repository: activemq
Updated Branches:
  refs/heads/activemq-5.15.x 0e0f5ea8f -> 0f544fd54


[AMQ-6831, AMQ-6771] fix up recovery check to ensure full batch is available in memory, regression
from AMQ-6771

(cherry picked from commit f9899922783e0e94de030f4c867e5d48a3d869a9)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/0f544fd5
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/0f544fd5
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/0f544fd5

Branch: refs/heads/activemq-5.15.x
Commit: 0f544fd54bde5d39f5e6e19d189580884740768b
Parents: 0e0f5ea
Author: gtully <gary.tully@gmail.com>
Authored: Tue Oct 10 17:24:09 2017 +0100
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Tue Oct 10 12:29:24 2017 -0400

----------------------------------------------------------------------
 .../store/kahadb/disk/journal/Journal.java      | 10 +++++-
 .../store/kahadb/MessageDatabaseTest.java       | 36 ++++++++++++++++++++
 2 files changed, 45 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/0f544fd5/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
index 5edee92..8e04414 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
@@ -567,10 +567,11 @@ public class Journal {
     }
 
     private int checkBatchRecord(ByteSequence bs, RandomAccessFile reader) throws IOException
{
-
+        ensureAvailable(bs, reader, EOF_RECORD.length);
         if (bs.startsWith(EOF_RECORD)) {
             return 0; // eof
         }
+        ensureAvailable(bs, reader, BATCH_CONTROL_RECORD_SIZE);
         try (DataByteArrayInputStream controlIs = new DataByteArrayInputStream(bs)) {
 
             // Assert that it's a batch record.
@@ -623,6 +624,13 @@ public class Journal {
         }
     }
 
+    private void ensureAvailable(ByteSequence bs, RandomAccessFile reader, int required)
throws IOException {
+        if (bs.remaining() < required) {
+            bs.reset();
+            bs.setLength(bs.length + reader.read(bs.data, bs.length, bs.data.length - bs.length));
+        }
+    }
+
     void addToTotalLength(int size) {
         totalLength.addAndGet(size);
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/0f544fd5/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseTest.java
b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseTest.java
index d09be68..604b46f 100644
--- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseTest.java
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseTest.java
@@ -17,8 +17,10 @@
 
 package org.apache.activemq.store.kahadb;
 
+import org.apache.activemq.ActiveMQMessageAuditNoSync;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.store.kahadb.disk.journal.Journal;
+import org.apache.activemq.util.ByteSequence;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -28,6 +30,7 @@ import java.io.IOException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.activemq.store.kahadb.disk.journal.Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
 import static org.junit.Assert.*;
 
 public class MessageDatabaseTest {
@@ -78,4 +81,37 @@ public class MessageDatabaseTest {
         assertNull("audit location should be null", kaha.getMetadata().producerSequenceIdTrackerLocation);
     }
 
+    @Test
+    public void testRecoverCheckOnBatchBoundary() throws Exception {
+
+        KahaDBStore kaha = new KahaDBStore();
+        kaha.setDirectory(new File(temporaryFolder.getRoot(), "kaha2"));
+        kaha.setCheckpointInterval(0l); // disable periodic checkpoint
+        kaha.setCheckForCorruptJournalFiles(true);
+        kaha.setChecksumJournalFiles(true);
+        kaha.setMaxFailoverProducersToTrack(10);
+        kaha.setBrokerService(new BrokerService() {
+            public void handleIOException(IOException exception) {
+                exception.printStackTrace();
+            }
+        });
+        kaha.start();
+
+        // track original metadata reference to ensure it is read from the journal on recovery
+        ActiveMQMessageAuditNoSync auditToVerify = kaha.getMetadata().producerSequenceIdTracker;
+        final String messsageId = "1:1:1:1";
+        auditToVerify.isDuplicate(messsageId);
+
+        ByteSequence byteSequence = new ByteSequence(new byte[DEFAULT_MAX_WRITE_BATCH_SIZE
- 110]);
+        kaha.getJournal().write(byteSequence, false);
+        kaha.getJournal().write(byteSequence, false);
+
+        kaha.stop();
+        try {
+            kaha.start();
+            assertTrue("Value from journal recovered ok", kaha.getMetadata().producerSequenceIdTracker.isDuplicate(messsageId));
+        } finally {
+            kaha.stop();
+        }
+    }
 }
\ No newline at end of file


Mime
View raw message