cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bdeggles...@apache.org
Subject [1/6] cassandra git commit: Filter header only commit logs before recovery
Date Fri, 29 Sep 2017 22:38:54 GMT
Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 ab0adf9f9 -> 95839aae2
  refs/heads/cassandra-3.11 2a24acfa9 -> be2117492
  refs/heads/trunk ebefc96a8 -> 77abf868a


Filter header only commit logs before recovery

Patch by Blake Eggleston; Reviewed by Sam Tunnicliffe for CASSANDRA-13918


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/95839aae
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/95839aae
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/95839aae

Branch: refs/heads/cassandra-3.0
Commit: 95839aae2fde28fa29b16741de6bd52c0697843f
Parents: ab0adf9
Author: Blake Eggleston <bdeggleston@gmail.com>
Authored: Thu Sep 28 15:01:35 2017 -0700
Committer: Blake Eggleston <bdeggleston@gmail.com>
Committed: Fri Sep 29 15:05:30 2017 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/commitlog/CommitLogReplayer.java         | 57 +++++++++++++++++---
 .../db/commitlog/CommitLogSegment.java          |  2 +-
 .../db/commitlog/CompressedSegment.java         |  2 +-
 .../db/commitlog/MemoryMappedSegment.java       |  2 +-
 .../cassandra/db/commitlog/CommitLogTest.java   | 53 ++++++++++++++++++
 6 files changed, 108 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/95839aae/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a1f49cd..7ff61d3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.15
+ * Filter header only commit logs before recovery (CASSANDRA-13918)
  * AssertionError prepending to a list (CASSANDRA-13149)
  * Fix support for SuperColumn tables (CASSANDRA-12373)
  * Handle limit correctly on tables with strict liveness (CASSANDRA-13883)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/95839aae/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index b3b26dd..4fd263c 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -25,18 +25,15 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.zip.CRC32;
 
 import com.google.common.base.Predicate;
-import com.google.common.base.Throwables;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Ordering;
-import com.google.common.util.concurrent.Uninterruptibles;
 
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
@@ -138,11 +135,59 @@ public class CommitLogReplayer
         return new CommitLogReplayer(commitLog, globalPosition, cfPersisted, replayFilter);
     }
 
+    private static boolean shouldSkip(File file) throws IOException, ConfigurationException
+    {
+        CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName());
+        if (desc.version < CommitLogDescriptor.VERSION_21)
+        {
+            return false;
+        }
+        try(ChannelProxy channel = new ChannelProxy(file);
+            RandomAccessReader reader = RandomAccessReader.open(channel))
+        {
+            CommitLogDescriptor.readHeader(reader);
+            int end = reader.readInt();
+            long filecrc = reader.readInt() & 0xffffffffL;
+            return end == 0 && filecrc == 0;
+        }
+    }
+
+    private static List<File> filterCommitLogFiles(File[] toFilter)
+    {
+        List<File> filtered = new ArrayList<>(toFilter.length);
+        for (File file: toFilter)
+        {
+            try
+            {
+                if (shouldSkip(file))
+                {
+                    logger.info("Skipping playback of empty log: {}", file.getName());
+                }
+                else
+                {
+                    filtered.add(file);
+                }
+            }
+            catch (Exception e)
+            {
+                // let recover deal with it
+                filtered.add(file);
+            }
+        }
+
+        return filtered;
+    }
+
     public void recover(File[] clogs) throws IOException
     {
-        int i;
-        for (i = 0; i < clogs.length; ++i)
-            recover(clogs[i], i + 1 == clogs.length);
+        List<File> filteredLogs = filterCommitLogFiles(clogs);
+
+        int i = 0;
+        for (File clog: filteredLogs)
+        {
+            i++;
+            recover(clog, i == filteredLogs.size());
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/95839aae/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index f26f0dc..236a1b1 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@ -315,7 +315,7 @@ public abstract class CommitLogSegment
         syncComplete.signalAll();
     }
 
-    protected void writeSyncMarker(ByteBuffer buffer, int offset, int filePos, int nextMarker)
+    protected static void writeSyncMarker(long id, ByteBuffer buffer, int offset, int filePos,
int nextMarker)
     {
         CRC32 crc = new CRC32();
         updateChecksumInt(crc, (int) (id & 0xFFFFFFFFL));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/95839aae/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
index c73a30a..c00ce18 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
@@ -134,7 +134,7 @@ public class CompressedSegment extends CommitLogSegment
 
             // Only one thread can be here at a given time.
             // Protected by synchronization on CommitLogSegment.sync().
-            writeSyncMarker(compressedBuffer, 0, (int) channel.position(), (int) channel.position()
+ compressedBuffer.remaining());
+            writeSyncMarker(id, compressedBuffer, 0, (int) channel.position(), (int) channel.position()
+ compressedBuffer.remaining());
             commitLog.allocator.addSize(compressedBuffer.limit());
             channel.write(compressedBuffer);
             assert channel.position() - lastWrittenPos == compressedBuffer.limit();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/95839aae/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java b/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
index 326469f..3a16d91 100644
--- a/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
@@ -77,7 +77,7 @@ public class MemoryMappedSegment extends CommitLogSegment
 
         // write previous sync marker to point to next sync marker
         // we don't chain the crcs here to ensure this method is idempotent if it fails
-        writeSyncMarker(buffer, startMarker, startMarker, nextMarker);
+        writeSyncMarker(id, buffer, startMarker, startMarker, nextMarker);
 
         try {
             SyncUtil.force((MappedByteBuffer) buffer);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/95839aae/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index 1543415..9e9ee53 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@ -30,6 +30,7 @@ import java.util.function.BiConsumer;
 import java.util.zip.CRC32;
 import java.util.zip.Checksum;
 
+import com.google.common.io.Files;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -134,6 +135,58 @@ public class CommitLogTest
         CommitLog.instance.recover(new File[]{ tmpFile(CommitLogDescriptor.current_version)
 });
     }
 
+    /**
+     * Since commit log segments can be allocated before they're needed, the commit log file
with the highest
+     * id isn't neccesarily the last log that we wrote to. We should remove header only logs
on recover so we
+     * can tolerate truncated logs
+     */
+    @Test
+    public void testHeaderOnlyFileFiltering() throws Exception
+    {
+        File directory = Files.createTempDir();
+
+        CommitLogDescriptor desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version,
1, null);
+        CommitLogDescriptor desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version,
2, null);
+
+        ByteBuffer buffer;
+
+        // this has a header and malformed data
+        File file1 = new File(directory, desc1.fileName());
+        buffer = ByteBuffer.allocate(1024);
+        CommitLogDescriptor.writeHeader(buffer, desc1);
+        int pos = buffer.position();
+        CommitLogSegment.writeSyncMarker(desc1.id, buffer, buffer.position(), buffer.position(),
buffer.position() + 128);
+        buffer.position(pos + 8);
+        buffer.putInt(5);
+        buffer.putInt(6);
+
+        try (OutputStream lout = new FileOutputStream(file1))
+        {
+            lout.write(buffer.array());
+        }
+
+        // this has only a header
+        File file2 = new File(directory, desc2.fileName());
+        buffer = ByteBuffer.allocate(1024);
+        CommitLogDescriptor.writeHeader(buffer, desc2);
+        try (OutputStream lout = new FileOutputStream(file2))
+        {
+            lout.write(buffer.array());
+        }
+
+        // one corrupt file and one header only file should be ok
+        runExpecting(() -> {
+            CommitLog.instance.recover(file1, file2);
+            return null;
+        }, null);
+
+        // 2 corrupt files and one header only file should fail
+        runExpecting(() -> {
+            CommitLog.instance.recover(file1, file1, file2);
+            return null;
+        }, CommitLogReplayException.class);
+    }
+
     @Test
     public void testRecoveryWithEmptyLog20() throws Exception
     {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


Mime
View raw message