cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject [3/8] cassandra git commit: Honors commit log policy when replaying; treats errors in the last segment (section for compressed files) as permissible due to incomplete flush. Reviewed by aweisberg for CASSANDRA-9749
Date Wed, 19 Aug 2015 01:16:54 GMT
Honors commit log policy when replaying; treats errors in the last segment (section for compressed files) as permissible due to incomplete flush.
Reviewed by aweisberg for CASSANDRA-9749


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fce8478f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fce8478f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fce8478f

Branch: refs/heads/trunk
Commit: fce8478ff43232d7e3623e457b1252f9700b451d
Parents: 6d9fa37
Author: Branimir Lambov <branimir.lambov@datastax.com>
Authored: Thu Jul 30 20:59:16 2015 +0300
Committer: Jonathan Ellis <jbellis@apache.org>
Committed: Tue Aug 18 20:13:26 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/db/commitlog/CommitLog.java       |   4 +-
 .../db/commitlog/CommitLogReplayer.java         | 120 ++++-
 .../CommitLog-5-1438186885380.log               | Bin 0 -> 839051 bytes
 .../legacy-commitlog/2.2-lz4-bitrot/hash.txt    |   6 +
 .../CommitLog-5-1438186885380.log               | Bin 0 -> 839051 bytes
 .../legacy-commitlog/2.2-lz4-bitrot2/hash.txt   |   6 +
 .../CommitLog-5-1438186885380.log               | Bin 0 -> 839001 bytes
 .../legacy-commitlog/2.2-lz4-truncated/hash.txt |   5 +
 .../db/CommitLogFailurePolicyTest.java          | 141 -----
 .../org/apache/cassandra/db/CommitLogTest.java  | 421 ---------------
 .../commitlog/CommitLogFailurePolicyTest.java   | 140 +++++
 .../cassandra/db/commitlog/CommitLogTest.java   | 535 +++++++++++++++++++
 .../db/commitlog/CommitLogUpgradeTest.java      |  55 ++
 14 files changed, 844 insertions(+), 590 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/fce8478f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1299aa2..b9af440 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.1
+ * Apply commit_failure_policy to more errors on startup (CASSANDRA-9749)
  * Fix histogram overflow exception (CASSANDRA-9973)
  * Route gossip messages over dedicated socket (CASSANDRA-9237)
  * Add checksum to saved cache files (CASSANDRA-9265)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fce8478f/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index b3f944d..f23ebae 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -189,7 +189,9 @@ public class CommitLog implements CommitLogMBean
      */
     public void recover(String path) throws IOException
     {
-        recover(new File(path));
+        CommitLogReplayer recovery = CommitLogReplayer.create();
+        recovery.recover(new File(path), false);
+        recovery.blockForWrites();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fce8478f/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index 176f64b..af515d2 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -34,12 +34,9 @@ import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Ordering;
-
 import org.apache.commons.lang3.StringUtils;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import com.github.tjake.ICRC32;
 
 import org.apache.cassandra.concurrent.Stage;
@@ -64,6 +61,7 @@ import org.cliffc.high_scale_lib.NonBlockingHashSet;
 
 public class CommitLogReplayer
 {
+    static final String IGNORE_REPLAY_ERRORS_PROPERTY = "cassandra.commitlog.ignorereplayerrors";
     private static final Logger logger = LoggerFactory.getLogger(CommitLogReplayer.class);
     private static final int MAX_OUTSTANDING_REPLAY_COUNT = Integer.getInteger("cassandra.commitlog_max_outstanding_replay_count", 1024);
     private static final int LEGACY_END_OF_SEGMENT_MARKER = 0;
@@ -142,14 +140,15 @@ public class CommitLogReplayer
 
     public void recover(File[] clogs) throws IOException
     {
-        for (final File file : clogs)
-            recover(file);
+        int i;
+        for (i = 0; i < clogs.length; ++i)
+            recover(clogs[i], i + 1 == clogs.length);
     }
 
     public int blockForWrites()
     {
         for (Map.Entry<UUID, AtomicInteger> entry : invalidMutations.entrySet())
-            logger.info(String.format("Skipped %d mutations from unknown (probably removed) CF with id %s", entry.getValue().intValue(), entry.getKey()));
+            logger.warn(String.format("Skipped %d mutations from unknown (probably removed) CF with id %s", entry.getValue().intValue(), entry.getKey()));
 
         // wait for all the writes to finish on the mutation stage
         FBUtilities.waitOnFutures(futures);
@@ -163,7 +162,7 @@ public class CommitLogReplayer
         return replayedCount.get();
     }
 
-    private int readSyncMarker(CommitLogDescriptor descriptor, int offset, RandomAccessReader reader) throws IOException
+    private int readSyncMarker(CommitLogDescriptor descriptor, int offset, RandomAccessReader reader, boolean tolerateTruncation) throws IOException
     {
         if (offset > reader.length() - CommitLogSegment.SYNC_MARKER_SIZE)
         {
@@ -181,13 +180,17 @@ public class CommitLogReplayer
         {
             if (end != 0 || filecrc != 0)
             {
-                logger.warn("Encountered bad header at position {} of commit log {}, with invalid CRC. The end of segment marker should be zero.", offset, reader.getPath());
+                handleReplayError(false,
+                                  "Encountered bad header at position %d of commit log %s, with invalid CRC. " +
+                                  "The end of segment marker should be zero.",
+                                  offset, reader.getPath());
             }
             return -1;
         }
         else if (end < offset || end > reader.length())
         {
-            logger.warn("Encountered bad header at position {} of commit log {}, with bad position but valid CRC", offset, reader.getPath());
+            handleReplayError(tolerateTruncation, "Encountered bad header at position %d of commit log %s, with bad position but valid CRC",
+                              offset, reader.getPath());
             return -1;
         }
         return end;
@@ -268,8 +271,7 @@ public class CommitLogReplayer
         }
     }
 
-    @SuppressWarnings("resource")
-    public void recover(File file) throws IOException
+    public void recover(File file, boolean tolerateTruncation) throws IOException
     {
         CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName());
         RandomAccessReader reader = RandomAccessReader.open(new File(file.getAbsolutePath()));
@@ -281,7 +283,7 @@ public class CommitLogReplayer
                     return;
                 if (globalPosition.segment == desc.id)
                     reader.seek(globalPosition.position);
-                replaySyncSection(reader, (int) reader.getPositionLimit(), desc);
+                replaySyncSection(reader, (int) reader.getPositionLimit(), desc, desc.fileName(), tolerateTruncation);
                 return;
             }
 
@@ -295,10 +297,15 @@ public class CommitLogReplayer
                 desc = null;
             }
             if (desc == null) {
-                logger.warn("Could not read commit log descriptor in file {}", file);
+                handleReplayError(false, "Could not read commit log descriptor in file %s", file);
                 return;
             }
-            assert segmentId == desc.id;
+            if (segmentId != desc.id)
+            {
+                handleReplayError(false, "Segment id mismatch (filename %d, descriptor %d) in file %s", segmentId, desc.id, file);
+                // continue processing if ignored.
+            }
+
             if (logAndCheckIfShouldSkip(file, desc))
                 return;
 
@@ -311,7 +318,7 @@ public class CommitLogReplayer
                 }
                 catch (ConfigurationException e)
                 {
-                    logger.warn("Unknown compression: {}", e.getMessage());
+                    handleReplayError(false, "Unknown compression: %s", e.getMessage());
                     return;
                 }
             }
@@ -320,7 +327,7 @@ public class CommitLogReplayer
             int end = (int) reader.getFilePointer();
             int replayEnd = end;
 
-            while ((end = readSyncMarker(desc, end, reader)) >= 0)
+            while ((end = readSyncMarker(desc, end, reader, tolerateTruncation)) >= 0)
             {
                 int replayPos = replayEnd + CommitLogSegment.SYNC_MARKER_SIZE;
 
@@ -340,11 +347,17 @@ public class CommitLogReplayer
                     continue;
 
                 FileDataInput sectionReader = reader;
+                String errorContext = desc.fileName();
+                // In the uncompressed case the last non-fully-flushed section can be anywhere in the file.
+                boolean tolerateErrorsInSection = tolerateTruncation;
                 if (compressor != null)
                 {
+                    // In the compressed case we know if this is the last section.
+                    tolerateErrorsInSection &= end == reader.length() || end < 0;
+
+                    int start = (int) reader.getFilePointer();
                     try
                     {
-                        int start = (int) reader.getFilePointer();
                         int compressedLength = end - start;
                         if (logger.isDebugEnabled())
                             logger.trace("Decompressing {} between replay positions {} and {}",
@@ -359,15 +372,18 @@ public class CommitLogReplayer
                             uncompressedBuffer = new byte[(int) (1.2 * uncompressedLength)];
                         compressedLength = compressor.uncompress(buffer, 0, compressedLength, uncompressedBuffer, 0);
                         sectionReader = new ByteBufferDataInput(ByteBuffer.wrap(uncompressedBuffer), reader.getPath(), replayPos, 0);
+                        errorContext = "compressed section at " + start + " in " + errorContext;
                     }
-                    catch (IOException e)
+                    catch (IOException | ArrayIndexOutOfBoundsException e)
                     {
-                        logger.error("Unexpected exception decompressing section {}", e);
+                        handleReplayError(tolerateErrorsInSection,
+                                          "Unexpected exception decompressing section at %d: %s",
+                                          start, e);
                         continue;
                     }
                 }
 
-                if (!replaySyncSection(sectionReader, replayEnd, desc))
+                if (!replaySyncSection(sectionReader, replayEnd, desc, errorContext, tolerateErrorsInSection))
                     break;
             }
         }
@@ -399,13 +415,14 @@ public class CommitLogReplayer
      *
      * @return Whether replay should continue with the next section.
      */
-    private boolean replaySyncSection(FileDataInput reader, int end, CommitLogDescriptor desc) throws IOException
+    private boolean replaySyncSection(FileDataInput reader, int end, CommitLogDescriptor desc, String errorContext, boolean tolerateErrors) throws IOException
     {
          /* read the logs populate Mutation and apply */
         while (reader.getFilePointer() < end && !reader.isEOF())
         {
+            long mutationStart = reader.getFilePointer();
             if (logger.isDebugEnabled())
-                logger.trace("Reading mutation at {}", reader.getFilePointer());
+                logger.trace("Reading mutation at {}", mutationStart);
 
             long claimedCRC32;
             int serializedSize;
@@ -424,7 +441,12 @@ public class CommitLogReplayer
                 // 2-byte length from writeUTF/writeWithShortLength) and 4 bytes for column count.
                 // This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128
                 if (serializedSize < 10)
+                {
+                    handleReplayError(tolerateErrors,
+                                      "Invalid mutation size %d at %d in %s",
+                                      serializedSize, mutationStart, errorContext);
                     return false;
+                }
 
                 long claimedSizeChecksum;
                 if (desc.version < CommitLogDescriptor.VERSION_21)
@@ -438,7 +460,12 @@ public class CommitLogReplayer
                     checksum.updateInt(serializedSize);
 
                 if (checksum.getValue() != claimedSizeChecksum)
+                {
+                    handleReplayError(tolerateErrors,
+                                      "Mutation size checksum failure at %d in %s",
+                                      mutationStart, errorContext);
                     return false;
+                }
                 // ok.
 
                 if (serializedSize > buffer.length)
@@ -451,14 +478,18 @@ public class CommitLogReplayer
             }
             catch (EOFException eof)
             {
+                handleReplayError(tolerateErrors,
+                                  "Unexpected end of segment",
+                                  mutationStart, errorContext);
                 return false; // last CL entry didn't get completely written. that's ok.
             }
 
             checksum.update(buffer, 0, serializedSize);
             if (claimedCRC32 != checksum.getValue())
             {
-                // this entry must not have been fsynced. probably the rest is bad too,
-                // but just in case there is no harm in trying them (since we still read on an entry boundary)
+                handleReplayError(tolerateErrors,
+                                  "Mutation checksum failure at %d in %s",
+                                  mutationStart, errorContext);
                 continue;
             }
             replayMutation(buffer, serializedSize, reader.getFilePointer(), desc);
@@ -508,9 +539,13 @@ public class CommitLogReplayer
                 out.write(inputBuffer, 0, size);
             }
 
-            String st = String.format("Unexpected error deserializing mutation; saved to %s and ignored.  This may be caused by replaying a mutation against a table with the same name but incompatible schema.  Exception follows: ",
-                                      f.getAbsolutePath());
-            logger.error(st, t);
+            // Checksum passed so this error can't be permissible.
+            handleReplayError(false,
+                              "Unexpected error deserializing mutation; saved to %s.  " +
+                              "This may be caused by replaying a mutation against a table with the same name but incompatible schema.  " +
+                              "Exception follows: %s",
+                              f.getAbsolutePath(),
+                              t);
             return;
         }
 
@@ -578,4 +613,35 @@ public class CommitLogReplayer
         }
         return false;
     }
+
+    static void handleReplayError(boolean permissible, String message, Object... messageArgs) throws IOException
+    {
+        String msg = String.format(message, messageArgs);
+        IOException e = new CommitLogReplayException(msg);
+        if (permissible)
+            logger.error("Ignoring commit log replay error likely due to incomplete flush to disk", e);
+        else if (Boolean.getBoolean(IGNORE_REPLAY_ERRORS_PROPERTY))
+            logger.error("Ignoring commit log replay error", e);
+        else if (!CommitLog.handleCommitError("Failed commit log replay", e))
+        {
+            logger.error("Replay stopped. If you wish to override this error and continue starting the node ignoring " +
+                         "commit log replay problems, specify -D" + IGNORE_REPLAY_ERRORS_PROPERTY + "=true " +
+                         "on the command line");
+            throw e;
+        }
+    }
+
+    @SuppressWarnings("serial")
+    public static class CommitLogReplayException extends IOException
+    {
+        public CommitLogReplayException(String message, Throwable cause)
+        {
+            super(message, cause);
+        }
+
+        public CommitLogReplayException(String message)
+        {
+            super(message);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fce8478f/test/data/legacy-commitlog/2.2-lz4-bitrot/CommitLog-5-1438186885380.log
----------------------------------------------------------------------
diff --git a/test/data/legacy-commitlog/2.2-lz4-bitrot/CommitLog-5-1438186885380.log b/test/data/legacy-commitlog/2.2-lz4-bitrot/CommitLog-5-1438186885380.log
new file mode 100644
index 0000000..d248d59
Binary files /dev/null and b/test/data/legacy-commitlog/2.2-lz4-bitrot/CommitLog-5-1438186885380.log differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fce8478f/test/data/legacy-commitlog/2.2-lz4-bitrot/hash.txt
----------------------------------------------------------------------
diff --git a/test/data/legacy-commitlog/2.2-lz4-bitrot/hash.txt b/test/data/legacy-commitlog/2.2-lz4-bitrot/hash.txt
new file mode 100644
index 0000000..c4d8fe7
--- /dev/null
+++ b/test/data/legacy-commitlog/2.2-lz4-bitrot/hash.txt
@@ -0,0 +1,6 @@
+#CommitLog bitrot test, version 2.2.0-SNAPSHOT
+#This is a copy of 2.2-lz4 with some overwritten bytes.
+#Replaying this should result in an error which can be overridden.
+cells=6051
+hash=-170208326
+cfid=dc32ce20-360d-11e5-826c-afadad37221d

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fce8478f/test/data/legacy-commitlog/2.2-lz4-bitrot2/CommitLog-5-1438186885380.log
----------------------------------------------------------------------
diff --git a/test/data/legacy-commitlog/2.2-lz4-bitrot2/CommitLog-5-1438186885380.log b/test/data/legacy-commitlog/2.2-lz4-bitrot2/CommitLog-5-1438186885380.log
new file mode 100644
index 0000000..083d65c
Binary files /dev/null and b/test/data/legacy-commitlog/2.2-lz4-bitrot2/CommitLog-5-1438186885380.log differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fce8478f/test/data/legacy-commitlog/2.2-lz4-bitrot2/hash.txt
----------------------------------------------------------------------
diff --git a/test/data/legacy-commitlog/2.2-lz4-bitrot2/hash.txt b/test/data/legacy-commitlog/2.2-lz4-bitrot2/hash.txt
new file mode 100644
index 0000000..c49dda0
--- /dev/null
+++ b/test/data/legacy-commitlog/2.2-lz4-bitrot2/hash.txt
@@ -0,0 +1,6 @@
+#CommitLog upgrade test, version 2.2.0-SNAPSHOT
+#This is a copy of 2.2-lz4 with some overwritten bytes.
+#Replaying this should result in an error which can be overridden.
+cells=6037
+hash=-1312748407
+cfid=dc32ce20-360d-11e5-826c-afadad37221d

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fce8478f/test/data/legacy-commitlog/2.2-lz4-truncated/CommitLog-5-1438186885380.log
----------------------------------------------------------------------
diff --git a/test/data/legacy-commitlog/2.2-lz4-truncated/CommitLog-5-1438186885380.log b/test/data/legacy-commitlog/2.2-lz4-truncated/CommitLog-5-1438186885380.log
new file mode 100644
index 0000000..939d408
Binary files /dev/null and b/test/data/legacy-commitlog/2.2-lz4-truncated/CommitLog-5-1438186885380.log differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fce8478f/test/data/legacy-commitlog/2.2-lz4-truncated/hash.txt
----------------------------------------------------------------------
diff --git a/test/data/legacy-commitlog/2.2-lz4-truncated/hash.txt b/test/data/legacy-commitlog/2.2-lz4-truncated/hash.txt
new file mode 100644
index 0000000..ce7f600
--- /dev/null
+++ b/test/data/legacy-commitlog/2.2-lz4-truncated/hash.txt
@@ -0,0 +1,5 @@
+#Truncated CommitLog test.
+#This is a copy of 2.2-lz4 with the last 50 bytes deleted.
+cells=6037
+hash=-889057729
+cfid=dc32ce20-360d-11e5-826c-afadad37221d

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fce8478f/test/unit/org/apache/cassandra/db/CommitLogFailurePolicyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CommitLogFailurePolicyTest.java b/test/unit/org/apache/cassandra/db/CommitLogFailurePolicyTest.java
deleted file mode 100644
index 0ecab3c..0000000
--- a/test/unit/org/apache/cassandra/db/CommitLogFailurePolicyTest.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*    http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.cassandra.db;
-
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.config.Config;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.commitlog.CommitLog;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.service.CassandraDaemon;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.JVMStabilityInspector;
-import org.apache.cassandra.utils.KillerForTests;
-
-public class CommitLogFailurePolicyTest
-{
-    @BeforeClass
-    public static void defineSchema() throws ConfigurationException
-    {
-        SchemaLoader.prepareServer();
-        System.setProperty("cassandra.commitlog.stop_on_errors", "true");
-    }
-
-    @Test
-    public void testCommitFailurePolicy_stop() throws ConfigurationException
-    {
-        CassandraDaemon daemon = new CassandraDaemon();
-        daemon.completeSetup(); //startup must be completed, otherwise commit log failure must kill JVM regardless of failure policy
-        StorageService.instance.registerDaemon(daemon);
-
-        // Need storage service active so stop policy can shutdown gossip
-        StorageService.instance.initServer();
-        Assert.assertTrue(Gossiper.instance.isEnabled());
-
-        Config.CommitFailurePolicy oldPolicy = DatabaseDescriptor.getCommitFailurePolicy();
-        try
-        {
-            DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.stop);
-            CommitLog.handleCommitError("Test stop error", new Throwable());
-            Assert.assertFalse(Gossiper.instance.isEnabled());
-        }
-        finally
-        {
-            DatabaseDescriptor.setCommitFailurePolicy(oldPolicy);
-        }
-    }
-
-    @Test
-    public void testCommitFailurePolicy_die()
-    {
-        CassandraDaemon daemon = new CassandraDaemon();
-        daemon.completeSetup(); //startup must be completed, otherwise commit log failure must kill JVM regardless of failure policy
-        StorageService.instance.registerDaemon(daemon);
-
-        KillerForTests killerForTests = new KillerForTests();
-        JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests);
-        Config.CommitFailurePolicy oldPolicy = DatabaseDescriptor.getCommitFailurePolicy();
-        try
-        {
-            DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.die);
-            CommitLog.handleCommitError("Testing die policy", new Throwable());
-            Assert.assertTrue(killerForTests.wasKilled());
-            Assert.assertFalse(killerForTests.wasKilledQuietly()); //only killed quietly on startup failure
-        }
-        finally
-        {
-            DatabaseDescriptor.setCommitFailurePolicy(oldPolicy);
-            JVMStabilityInspector.replaceKiller(originalKiller);
-        }
-    }
-
-    @Test
-    public void testCommitFailurePolicy_ignore_beforeStartup()
-    {
-        //startup was not completed successfuly (since method completeSetup() was not called)
-        CassandraDaemon daemon = new CassandraDaemon();
-        StorageService.instance.registerDaemon(daemon);
-
-        KillerForTests killerForTests = new KillerForTests();
-        JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests);
-        Config.CommitFailurePolicy oldPolicy = DatabaseDescriptor.getCommitFailurePolicy();
-        try
-        {
-            DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.ignore);
-            CommitLog.handleCommitError("Testing ignore policy", new Throwable());
-            //even though policy is ignore, JVM must die because Daemon has not finished initializing
-            Assert.assertTrue(killerForTests.wasKilled());
-            Assert.assertTrue(killerForTests.wasKilledQuietly()); //killed quietly due to startup failure
-        }
-        finally
-        {
-            DatabaseDescriptor.setCommitFailurePolicy(oldPolicy);
-            JVMStabilityInspector.replaceKiller(originalKiller);
-        }
-    }
-
-    @Test
-    public void testCommitFailurePolicy_ignore_afterStartup() throws Exception
-    {
-        CassandraDaemon daemon = new CassandraDaemon();
-        daemon.completeSetup(); //startup must be completed, otherwise commit log failure must kill JVM regardless of failure policy
-        StorageService.instance.registerDaemon(daemon);
-
-        KillerForTests killerForTests = new KillerForTests();
-        JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests);
-        Config.CommitFailurePolicy oldPolicy = DatabaseDescriptor.getCommitFailurePolicy();
-        try
-        {
-            DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.ignore);
-            CommitLog.handleCommitError("Testing ignore policy", new Throwable());
-            //error policy is set to IGNORE, so JVM must not be killed if error ocurs after startup
-            Assert.assertFalse(killerForTests.wasKilled());
-        }
-        finally
-        {
-            DatabaseDescriptor.setCommitFailurePolicy(oldPolicy);
-            JVMStabilityInspector.replaceKiller(originalKiller);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fce8478f/test/unit/org/apache/cassandra/db/CommitLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CommitLogTest.java b/test/unit/org/apache/cassandra/db/CommitLogTest.java
deleted file mode 100644
index 2764da4..0000000
--- a/test/unit/org/apache/cassandra/db/CommitLogTest.java
+++ /dev/null
@@ -1,421 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*    http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.cassandra.db;
-
-import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ExecutionException;
-import java.util.zip.CRC32;
-import java.util.zip.Checksum;
-
-import com.google.common.collect.ImmutableMap;
-
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.Util;
-import org.apache.cassandra.config.Config;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.KSMetaData;
-import org.apache.cassandra.config.ParameterizedClass;
-import org.apache.cassandra.db.commitlog.CommitLog;
-import org.apache.cassandra.db.commitlog.CommitLogDescriptor;
-import org.apache.cassandra.db.commitlog.CommitLogSegmentManager;
-import org.apache.cassandra.db.commitlog.ReplayPosition;
-import org.apache.cassandra.db.commitlog.CommitLogSegment;
-import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.db.filter.NamesQueryFilter;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.io.util.ByteBufferDataInput;
-import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.locator.SimpleStrategy;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.CassandraDaemon;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.JVMStabilityInspector;
-import org.apache.cassandra.utils.KillerForTests;
-
-public class CommitLogTest
-{
-    private static final String KEYSPACE1 = "CommitLogTest";
-    private static final String KEYSPACE2 = "CommitLogTestNonDurable";
-    private static final String CF1 = "Standard1";
-    private static final String CF2 = "Standard2";
-
-    @BeforeClass
-    public static void defineSchema() throws ConfigurationException
-    {
-        SchemaLoader.prepareServer();
-        SchemaLoader.createKeyspace(KEYSPACE1,
-                                    SimpleStrategy.class,
-                                    KSMetaData.optsWithRF(1),
-                                    SchemaLoader.standardCFMD(KEYSPACE1, CF1),
-                                    SchemaLoader.standardCFMD(KEYSPACE1, CF2));
-        SchemaLoader.createKeyspace(KEYSPACE2,
-                                    false,
-                                    true,
-                                    SimpleStrategy.class,
-                                    KSMetaData.optsWithRF(1),
-                                    SchemaLoader.standardCFMD(KEYSPACE1, CF1),
-                                    SchemaLoader.standardCFMD(KEYSPACE1, CF2));
-        System.setProperty("cassandra.commitlog.stop_on_errors", "true");
-        CompactionManager.instance.disableAutoCompaction();
-    }
-
-    @Test
-    public void testRecoveryWithEmptyLog() throws Exception
-    {
-        CommitLog.instance.recover(new File[]{ tmpFile() });
-    }
-
-    @Test
-    public void testRecoveryWithShortLog() throws Exception
-    {
-        // force EOF while reading log
-        testRecoveryWithBadSizeArgument(100, 10);
-    }
-
-    @Test
-    public void testRecoveryWithShortSize() throws Exception
-    {
-        testRecovery(new byte[2]);
-    }
-
-    @Test
-    public void testRecoveryWithShortCheckSum() throws Exception
-    {
-        testRecovery(new byte[6]);
-    }
-
-    @Test
-    public void testRecoveryWithGarbageLog() throws Exception
-    {
-        byte[] garbage = new byte[100];
-        (new java.util.Random()).nextBytes(garbage);
-        testRecovery(garbage);
-    }
-
-    @Test
-    public void testRecoveryWithBadSizeChecksum() throws Exception
-    {
-        Checksum checksum = new CRC32();
-        checksum.update(100);
-        testRecoveryWithBadSizeArgument(100, 100, ~checksum.getValue());
-    }
-
-    @Test
-    public void testRecoveryWithZeroSegmentSizeArgument() throws Exception
-    {
-        // many different combinations of 4 bytes (garbage) will be read as zero by readInt()
-        testRecoveryWithBadSizeArgument(0, 10); // zero size, but no EOF
-    }
-
-    @Test
-    public void testRecoveryWithNegativeSizeArgument() throws Exception
-    {
-        // garbage from a partial/bad flush could be read as a negative size even if there is no EOF
-        testRecoveryWithBadSizeArgument(-10, 10); // negative size, but no EOF
-    }
-
-    @Test
-    public void testDontDeleteIfDirty() throws Exception
-    {
-        CommitLog.instance.resetUnsafe(true);
-        // Roughly 32 MB mutation
-        Mutation rm = new Mutation(KEYSPACE1, bytes("k"));
-        rm.add(CF1, Util.cellname("c1"), ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize()/4), 0);
-
-        // Adding it 5 times
-        CommitLog.instance.add(rm);
-        CommitLog.instance.add(rm);
-        CommitLog.instance.add(rm);
-        CommitLog.instance.add(rm);
-        CommitLog.instance.add(rm);
-
-        // Adding new mutation on another CF
-        Mutation rm2 = new Mutation(KEYSPACE1, bytes("k"));
-        rm2.add(CF2, Util.cellname("c1"), ByteBuffer.allocate(4), 0);
-        CommitLog.instance.add(rm2);
-
-        assert CommitLog.instance.activeSegments() == 2 : "Expecting 2 segments, got " + CommitLog.instance.activeSegments();
-
-        UUID cfid2 = rm2.getColumnFamilyIds().iterator().next();
-        CommitLog.instance.discardCompletedSegments(cfid2, CommitLog.instance.getContext());
-
-        // Assert we still have both our segment
-        assert CommitLog.instance.activeSegments() == 2 : "Expecting 2 segments, got " + CommitLog.instance.activeSegments();
-    }
-
-    @Test
-    public void testDeleteIfNotDirty() throws Exception
-    {
-        DatabaseDescriptor.getCommitLogSegmentSize();
-        CommitLog.instance.resetUnsafe(true);
-        // Roughly 32 MB mutation
-        Mutation rm = new Mutation(KEYSPACE1, bytes("k"));
-        rm.add(CF1, Util.cellname("c1"), ByteBuffer.allocate((DatabaseDescriptor.getCommitLogSegmentSize()/4) - 1), 0);
-
-        // Adding it twice (won't change segment)
-        CommitLog.instance.add(rm);
-        CommitLog.instance.add(rm);
-
-        assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments();
-
-        // "Flush": this won't delete anything
-        UUID cfid1 = rm.getColumnFamilyIds().iterator().next();
-        CommitLog.instance.sync(true);
-        CommitLog.instance.discardCompletedSegments(cfid1, CommitLog.instance.getContext());
-
-        assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments();
-
-        // Adding new mutation on another CF, large enough (including CL entry overhead) that a new segment is created
-        Mutation rm2 = new Mutation(KEYSPACE1, bytes("k"));
-        rm2.add(CF2, Util.cellname("c1"), ByteBuffer.allocate((DatabaseDescriptor.getCommitLogSegmentSize()/2) - 200), 0);
-        CommitLog.instance.add(rm2);
-        // also forces a new segment, since each entry-with-overhead is just under half the CL size
-        CommitLog.instance.add(rm2);
-        CommitLog.instance.add(rm2);
-
-        assert CommitLog.instance.activeSegments() == 3 : "Expecting 3 segments, got " + CommitLog.instance.activeSegments();
-
-
-        // "Flush" second cf: The first segment should be deleted since we
-        // didn't write anything on cf1 since last flush (and we flush cf2)
-
-        UUID cfid2 = rm2.getColumnFamilyIds().iterator().next();
-        CommitLog.instance.discardCompletedSegments(cfid2, CommitLog.instance.getContext());
-
-        // Assert we still have both our segment
-        assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments();
-    }
-
-    private static int getMaxRecordDataSize(String keyspace, ByteBuffer key, String table, CellName column)
-    {
-        Mutation rm = new Mutation(KEYSPACE1, bytes("k"));
-        rm.add("Standard1", Util.cellname("c1"), ByteBuffer.allocate(0), 0);
-
-        int max = (DatabaseDescriptor.getCommitLogSegmentSize() / 2);
-        max -= CommitLogSegment.ENTRY_OVERHEAD_SIZE; // log entry overhead
-        return max - (int) Mutation.serializer.serializedSize(rm, MessagingService.current_version);
-    }
-
-    private static int getMaxRecordDataSize()
-    {
-        return getMaxRecordDataSize(KEYSPACE1, bytes("k"), CF1, Util.cellname("c1"));
-    }
-
-    // CASSANDRA-3615
-    @Test
-    public void testEqualRecordLimit() throws Exception
-    {
-        CommitLog.instance.resetUnsafe(true);
-
-        Mutation rm = new Mutation(KEYSPACE1, bytes("k"));
-        rm.add(CF1, Util.cellname("c1"), ByteBuffer.allocate(getMaxRecordDataSize()), 0);
-        CommitLog.instance.add(rm);
-    }
-
-    @Test
-    public void testExceedRecordLimit() throws Exception
-    {
-        CommitLog.instance.resetUnsafe(true);
-        try
-        {
-            Mutation rm = new Mutation(KEYSPACE1, bytes("k"));
-            rm.add(CF1, Util.cellname("c1"), ByteBuffer.allocate(1 + getMaxRecordDataSize()), 0);
-            CommitLog.instance.add(rm);
-            throw new AssertionError("mutation larger than limit was accepted");
-        }
-        catch (IllegalArgumentException e)
-        {
-            // IAE is thrown on too-large mutations
-        }
-    }
-
-    protected void testRecoveryWithBadSizeArgument(int size, int dataSize) throws Exception
-    {
-        Checksum checksum = new CRC32();
-        checksum.update(size);
-        testRecoveryWithBadSizeArgument(size, dataSize, checksum.getValue());
-    }
-
-    protected void testRecoveryWithBadSizeArgument(int size, int dataSize, long checksum) throws Exception
-    {
-        ByteArrayOutputStream out = new ByteArrayOutputStream();
-        DataOutputStream dout = new DataOutputStream(out);
-        dout.writeInt(size);
-        dout.writeLong(checksum);
-        dout.write(new byte[dataSize]);
-        dout.close();
-        testRecovery(out.toByteArray());
-    }
-
-    protected File tmpFile() throws IOException
-    {
-        File logFile = File.createTempFile("CommitLog-" + CommitLogDescriptor.current_version + "-", ".log");
-        logFile.deleteOnExit();
-        assert logFile.length() == 0;
-        return logFile;
-    }
-
-    protected void testRecovery(byte[] logData) throws Exception
-    {
-        File logFile = tmpFile();
-        try (OutputStream lout = new FileOutputStream(logFile))
-        {
-            lout.write(logData);
-            //statics make it annoying to test things correctly
-            CommitLog.instance.recover(new File[]{ logFile }); //CASSANDRA-1119 / CASSANDRA-1179 throw on failure*/
-        }
-    }
-
-    @Test
-    public void testVersions()
-    {
-        Assert.assertTrue(CommitLogDescriptor.isValid("CommitLog-1340512736956320000.log"));
-        Assert.assertTrue(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000.log"));
-        Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog--1340512736956320000.log"));
-        Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog--2-1340512736956320000.log"));
-        Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000-123.log"));
-
-        Assert.assertEquals(1340512736956320000L, CommitLogDescriptor.fromFileName("CommitLog-2-1340512736956320000.log").id);
-
-        Assert.assertEquals(MessagingService.current_version, new CommitLogDescriptor(1340512736956320000L, null).getMessagingVersion());
-        String newCLName = "CommitLog-" + CommitLogDescriptor.current_version + "-1340512736956320000.log";
-        Assert.assertEquals(MessagingService.current_version, CommitLogDescriptor.fromFileName(newCLName).getMessagingVersion());
-    }
-
-    @Test
-    public void testTruncateWithoutSnapshot() throws ExecutionException, InterruptedException, IOException
-    {
-        CommitLog.instance.resetUnsafe(true);
-        boolean prev = DatabaseDescriptor.isAutoSnapshot();
-        DatabaseDescriptor.setAutoSnapshot(false);
-        ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore("Standard1");
-        ColumnFamilyStore cfs2 = Keyspace.open(KEYSPACE1).getColumnFamilyStore("Standard2");
-
-        final Mutation rm1 = new Mutation(KEYSPACE1, bytes("k"));
-        rm1.add("Standard1", Util.cellname("c1"), ByteBuffer.allocate(100), 0);
-        rm1.apply();
-        cfs1.truncateBlocking();
-        DatabaseDescriptor.setAutoSnapshot(prev);
-        final Mutation rm2 = new Mutation(KEYSPACE1, bytes("k"));
-        rm2.add("Standard2", Util.cellname("c1"), ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize() / 4), 0);
-
-        for (int i = 0 ; i < 5 ; i++)
-            CommitLog.instance.add(rm2);
-
-        Assert.assertEquals(2, CommitLog.instance.activeSegments());
-        ReplayPosition position = CommitLog.instance.getContext();
-        for (Keyspace ks : Keyspace.system())
-            for (ColumnFamilyStore syscfs : ks.getColumnFamilyStores())
-                CommitLog.instance.discardCompletedSegments(syscfs.metadata.cfId, position);
-        CommitLog.instance.discardCompletedSegments(cfs2.metadata.cfId, position);
-        Assert.assertEquals(1, CommitLog.instance.activeSegments());
-    }
-
-    @Test
-    public void testTruncateWithoutSnapshotNonDurable() throws IOException
-    {
-        CommitLog.instance.resetUnsafe(true);
-        boolean prevAutoSnapshot = DatabaseDescriptor.isAutoSnapshot();
-        DatabaseDescriptor.setAutoSnapshot(false);
-        Keyspace notDurableKs = Keyspace.open(KEYSPACE2);
-        Assert.assertFalse(notDurableKs.getMetadata().durableWrites);
-        ColumnFamilyStore cfs = notDurableKs.getColumnFamilyStore("Standard1");
-        CellNameType type = notDurableKs.getColumnFamilyStore("Standard1").getComparator();
-        Mutation rm;
-        DecoratedKey dk = Util.dk("key1");
-
-        // add data
-        rm = new Mutation(KEYSPACE2, dk.getKey());
-        rm.add("Standard1", Util.cellname("Column1"), ByteBufferUtil.bytes("abcd"), 0);
-        rm.apply();
-
-        ReadCommand command = new SliceByNamesReadCommand(KEYSPACE2, dk.getKey(), "Standard1", System.currentTimeMillis(), new NamesQueryFilter(FBUtilities.singleton(Util.cellname("Column1"), type)));
-        Row row = command.getRow(notDurableKs);
-        Cell col = row.cf.getColumn(Util.cellname("Column1"));
-        Assert.assertEquals(col.value(), ByteBuffer.wrap("abcd".getBytes()));
-        cfs.truncateBlocking();
-        DatabaseDescriptor.setAutoSnapshot(prevAutoSnapshot);
-        row = command.getRow(notDurableKs);
-        Assert.assertEquals(null, row.cf);
-    }
-    
-    private void testDescriptorPersistence(CommitLogDescriptor desc) throws IOException
-    {
-        ByteBuffer buf = ByteBuffer.allocate(1024);
-        CommitLogDescriptor.writeHeader(buf, desc);
-        long length = buf.position();
-        // Put some extra data in the stream.
-        buf.putDouble(0.1);
-        buf.flip();
-        FileDataInput input = new ByteBufferDataInput(buf, "input", 0, 0);
-        CommitLogDescriptor read = CommitLogDescriptor.readHeader(input);
-        Assert.assertEquals("Descriptor length", length, input.getFilePointer());
-        Assert.assertEquals("Descriptors", desc, read);
-    }
-    
-    @Test
-    public void testDescriptorPersistence() throws IOException
-    {
-        testDescriptorPersistence(new CommitLogDescriptor(11, null));
-        testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_21, 13, null));
-        testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_22, 15, null));
-        testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_22, 17, new ParameterizedClass("LZ4Compressor", null)));
-        testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_22, 19,
-                new ParameterizedClass("StubbyCompressor", ImmutableMap.of("parameter1", "value1", "flag2", "55", "argument3", "null"))));
-    }
-
-    @Test
-    public void testDescriptorInvalidParametersSize() throws IOException
-    {
-        Map<String, String> params = new HashMap<>();
-        for (int i=0; i<65535; ++i)
-            params.put("key"+i, Integer.toString(i, 16));
-        try {
-            CommitLogDescriptor desc = new CommitLogDescriptor(CommitLogDescriptor.VERSION_22,
-                                                               21,
-                                                               new ParameterizedClass("LZ4Compressor", params));
-            ByteBuffer buf = ByteBuffer.allocate(1024000);
-            CommitLogDescriptor.writeHeader(buf, desc);
-            Assert.fail("Parameter object too long should fail on writing descriptor.");
-        } catch (ConfigurationException e)
-        {
-            // correct path
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fce8478f/test/unit/org/apache/cassandra/db/commitlog/CommitLogFailurePolicyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogFailurePolicyTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogFailurePolicyTest.java
new file mode 100644
index 0000000..bde8ca3
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogFailurePolicyTest.java
@@ -0,0 +1,140 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.cassandra.db.commitlog;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.service.CassandraDaemon;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.KillerForTests;
+
+public class CommitLogFailurePolicyTest
+{
+    @BeforeClass
+    public static void defineSchema() throws ConfigurationException
+    {
+        SchemaLoader.prepareServer();
+    }
+
+    @Test
+    public void testCommitFailurePolicy_stop() throws ConfigurationException
+    {
+        CassandraDaemon daemon = new CassandraDaemon();
+        daemon.completeSetup(); //startup must be completed, otherwise commit log failure must kill JVM regardless of failure policy
+        StorageService.instance.registerDaemon(daemon);
+
+        // Need storage service active so stop policy can shutdown gossip
+        StorageService.instance.initServer();
+        Assert.assertTrue(Gossiper.instance.isEnabled());
+
+        Config.CommitFailurePolicy oldPolicy = DatabaseDescriptor.getCommitFailurePolicy();
+        try
+        {
+            DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.stop);
+            CommitLog.handleCommitError("Test stop error", new Throwable());
+            Assert.assertFalse(Gossiper.instance.isEnabled());
+        }
+        finally
+        {
+            DatabaseDescriptor.setCommitFailurePolicy(oldPolicy);
+        }
+    }
+
+    @Test
+    public void testCommitFailurePolicy_die()
+    {
+        CassandraDaemon daemon = new CassandraDaemon();
+        daemon.completeSetup(); //startup must be completed, otherwise commit log failure must kill JVM regardless of failure policy
+        StorageService.instance.registerDaemon(daemon);
+
+        KillerForTests killerForTests = new KillerForTests();
+        JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests);
+        Config.CommitFailurePolicy oldPolicy = DatabaseDescriptor.getCommitFailurePolicy();
+        try
+        {
+            DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.die);
+            CommitLog.handleCommitError("Testing die policy", new Throwable());
+            Assert.assertTrue(killerForTests.wasKilled());
+            Assert.assertFalse(killerForTests.wasKilledQuietly()); //only killed quietly on startup failure
+        }
+        finally
+        {
+            DatabaseDescriptor.setCommitFailurePolicy(oldPolicy);
+            JVMStabilityInspector.replaceKiller(originalKiller);
+        }
+    }
+
+    @Test
+    public void testCommitFailurePolicy_ignore_beforeStartup()
+    {
+        //startup was not completed successfuly (since method completeSetup() was not called)
+        CassandraDaemon daemon = new CassandraDaemon();
+        StorageService.instance.registerDaemon(daemon);
+
+        KillerForTests killerForTests = new KillerForTests();
+        JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests);
+        Config.CommitFailurePolicy oldPolicy = DatabaseDescriptor.getCommitFailurePolicy();
+        try
+        {
+            DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.ignore);
+            CommitLog.handleCommitError("Testing ignore policy", new Throwable());
+            //even though policy is ignore, JVM must die because Daemon has not finished initializing
+            Assert.assertTrue(killerForTests.wasKilled());
+            Assert.assertTrue(killerForTests.wasKilledQuietly()); //killed quietly due to startup failure
+        }
+        finally
+        {
+            DatabaseDescriptor.setCommitFailurePolicy(oldPolicy);
+            JVMStabilityInspector.replaceKiller(originalKiller);
+        }
+    }
+
+    @Test
+    public void testCommitFailurePolicy_ignore_afterStartup() throws Exception
+    {
+        CassandraDaemon daemon = new CassandraDaemon();
+        daemon.completeSetup(); //startup must be completed, otherwise commit log failure must kill JVM regardless of failure policy
+        StorageService.instance.registerDaemon(daemon);
+
+        KillerForTests killerForTests = new KillerForTests();
+        JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests);
+        Config.CommitFailurePolicy oldPolicy = DatabaseDescriptor.getCommitFailurePolicy();
+        try
+        {
+            DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.ignore);
+            CommitLog.handleCommitError("Testing ignore policy", new Throwable());
+            //error policy is set to IGNORE, so JVM must not be killed if error ocurs after startup
+            Assert.assertFalse(killerForTests.wasKilled());
+        }
+        finally
+        {
+            DatabaseDescriptor.setCommitFailurePolicy(oldPolicy);
+            JVMStabilityInspector.replaceKiller(originalKiller);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fce8478f/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
new file mode 100644
index 0000000..da8058c
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@ -0,0 +1,535 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.cassandra.db.commitlog;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.zip.CRC32;
+import java.util.zip.Checksum;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.Config.CommitFailurePolicy;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.config.ParameterizedClass;
+import org.apache.cassandra.db.Cell;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.db.SliceByNamesReadCommand;
+import org.apache.cassandra.db.commitlog.CommitLogReplayer.CommitLogReplayException;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.db.filter.NamesQueryFilter;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.util.ByteBufferDataInput;
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.locator.SimpleStrategy;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+
+public class CommitLogTest
+{
+    private static final String KEYSPACE1 = "CommitLogTest";
+    private static final String KEYSPACE2 = "CommitLogTestNonDurable";
+    private static final String CF1 = "Standard1";
+    private static final String CF2 = "Standard2";
+
+    @BeforeClass
+    public static void defineSchema() throws ConfigurationException
+    {
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE1,
+                                    SimpleStrategy.class,
+                                    KSMetaData.optsWithRF(1),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, CF1),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, CF2));
+        SchemaLoader.createKeyspace(KEYSPACE2,
+                                    false,
+                                    true,
+                                    SimpleStrategy.class,
+                                    KSMetaData.optsWithRF(1),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, CF1),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, CF2));
+        CompactionManager.instance.disableAutoCompaction();
+    }
+
+    @Test(expected = CommitLogReplayException.class)
+    public void testRecoveryWithEmptyLog() throws Exception
+    {
+        CommitLog.instance.recover(new File[]{ tmpFile(CommitLogDescriptor.current_version) });
+    }
+
+    @Test
+    public void testRecoveryWithEmptyLog20() throws Exception
+    {
+        CommitLog.instance.recover(new File[]{ tmpFile(CommitLogDescriptor.VERSION_20) });
+    }
+
+    @Test
+    public void testRecoveryWithZeroLog() throws Exception
+    {
+        testRecovery(new byte[10], null);
+    }
+
+    @Test
+    public void testRecoveryWithShortLog() throws Exception
+    {
+        // force EOF while reading log
+        testRecoveryWithBadSizeArgument(100, 10);
+    }
+
+    @Test(expected = CommitLogReplayException.class)
+    public void testRecoveryWithShortSize() throws Exception
+    {
+        testRecovery(new byte[2], CommitLogDescriptor.VERSION_20);
+    }
+
+    @Test
+    public void testRecoveryWithShortCheckSum() throws Exception
+    {
+        byte[] data = new byte[8];
+        data[3] = 10;   // make sure this is not a legacy end marker.
+        testRecovery(data, CommitLogReplayException.class);
+    }
+
+    @Test
+    public void testRecoveryWithShortMutationSize() throws Exception
+    {
+        testRecoveryWithBadSizeArgument(9, 10);
+    }
+
+    private void testRecoveryWithGarbageLog() throws Exception
+    {
+        byte[] garbage = new byte[100];
+        (new java.util.Random()).nextBytes(garbage);
+        testRecovery(garbage, CommitLogDescriptor.current_version);
+    }
+
+    @Test(expected = CommitLogReplayException.class)
+    public void testRecoveryWithGarbageLog_fail() throws Exception
+    {
+        testRecoveryWithGarbageLog();
+    }
+
+    @Test
+    public void testRecoveryWithGarbageLog_ignoredByProperty() throws Exception
+    {
+        try {
+            System.setProperty(CommitLogReplayer.IGNORE_REPLAY_ERRORS_PROPERTY, "true");
+            testRecoveryWithGarbageLog();
+        } finally {
+            System.clearProperty(CommitLogReplayer.IGNORE_REPLAY_ERRORS_PROPERTY);
+        }
+    }
+
+    @Test
+    public void testRecoveryWithGarbageLog_ignoredByPolicy() throws Exception
+    {
+        CommitFailurePolicy existingPolicy = DatabaseDescriptor.getCommitFailurePolicy();
+        try {
+            DatabaseDescriptor.setCommitFailurePolicy(CommitFailurePolicy.ignore);
+            testRecoveryWithGarbageLog();
+        } finally {
+            DatabaseDescriptor.setCommitFailurePolicy(existingPolicy);
+        }
+    }
+
+    @Test
+    public void testRecoveryWithBadSizeChecksum() throws Exception
+    {
+        Checksum checksum = new CRC32();
+        checksum.update(100);
+        testRecoveryWithBadSizeArgument(100, 100, ~checksum.getValue());
+    }
+
+    @Test
+    public void testRecoveryWithNegativeSizeArgument() throws Exception
+    {
+        // garbage from a partial/bad flush could be read as a negative size even if there is no EOF
+        testRecoveryWithBadSizeArgument(-10, 10); // negative size, but no EOF
+    }
+
+    @Test
+    public void testDontDeleteIfDirty() throws Exception
+    {
+        CommitLog.instance.resetUnsafe(true);
+        // Roughly 32 MB mutation
+        Mutation rm = new Mutation(KEYSPACE1, bytes("k"));
+        rm.add(CF1, Util.cellname("c1"), ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize()/4), 0);
+
+        // Adding it 5 times
+        CommitLog.instance.add(rm);
+        CommitLog.instance.add(rm);
+        CommitLog.instance.add(rm);
+        CommitLog.instance.add(rm);
+        CommitLog.instance.add(rm);
+
+        // Adding new mutation on another CF
+        Mutation rm2 = new Mutation(KEYSPACE1, bytes("k"));
+        rm2.add(CF2, Util.cellname("c1"), ByteBuffer.allocate(4), 0);
+        CommitLog.instance.add(rm2);
+
+        assert CommitLog.instance.activeSegments() == 2 : "Expecting 2 segments, got " + CommitLog.instance.activeSegments();
+
+        UUID cfid2 = rm2.getColumnFamilyIds().iterator().next();
+        CommitLog.instance.discardCompletedSegments(cfid2, CommitLog.instance.getContext());
+
+        // Assert we still have both our segment
+        assert CommitLog.instance.activeSegments() == 2 : "Expecting 2 segments, got " + CommitLog.instance.activeSegments();
+    }
+
+    @Test
+    public void testDeleteIfNotDirty() throws Exception
+    {
+        DatabaseDescriptor.getCommitLogSegmentSize();
+        CommitLog.instance.resetUnsafe(true);
+        // Roughly 32 MB mutation
+        Mutation rm = new Mutation(KEYSPACE1, bytes("k"));
+        rm.add(CF1, Util.cellname("c1"), ByteBuffer.allocate((DatabaseDescriptor.getCommitLogSegmentSize()/4) - 1), 0);
+
+        // Adding it twice (won't change segment)
+        CommitLog.instance.add(rm);
+        CommitLog.instance.add(rm);
+
+        assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments();
+
+        // "Flush": this won't delete anything
+        UUID cfid1 = rm.getColumnFamilyIds().iterator().next();
+        CommitLog.instance.sync(true);
+        CommitLog.instance.discardCompletedSegments(cfid1, CommitLog.instance.getContext());
+
+        assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments();
+
+        // Adding new mutation on another CF, large enough (including CL entry overhead) that a new segment is created
+        Mutation rm2 = new Mutation(KEYSPACE1, bytes("k"));
+        rm2.add(CF2, Util.cellname("c1"), ByteBuffer.allocate((DatabaseDescriptor.getCommitLogSegmentSize()/2) - 200), 0);
+        CommitLog.instance.add(rm2);
+        // also forces a new segment, since each entry-with-overhead is just under half the CL size
+        CommitLog.instance.add(rm2);
+        CommitLog.instance.add(rm2);
+
+        assert CommitLog.instance.activeSegments() == 3 : "Expecting 3 segments, got " + CommitLog.instance.activeSegments();
+
+
+        // "Flush" second cf: The first segment should be deleted since we
+        // didn't write anything on cf1 since last flush (and we flush cf2)
+
+        UUID cfid2 = rm2.getColumnFamilyIds().iterator().next();
+        CommitLog.instance.discardCompletedSegments(cfid2, CommitLog.instance.getContext());
+
+        // Assert we still have both our segment
+        assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments();
+    }
+
+    private static int getMaxRecordDataSize(String keyspace, ByteBuffer key, String table, CellName column)
+    {
+        Mutation rm = new Mutation(KEYSPACE1, bytes("k"));
+        rm.add("Standard1", Util.cellname("c1"), ByteBuffer.allocate(0), 0);
+
+        int max = (DatabaseDescriptor.getCommitLogSegmentSize() / 2);
+        max -= CommitLogSegment.ENTRY_OVERHEAD_SIZE; // log entry overhead
+        return max - (int) Mutation.serializer.serializedSize(rm, MessagingService.current_version);
+    }
+
+    private static int getMaxRecordDataSize()
+    {
+        return getMaxRecordDataSize(KEYSPACE1, bytes("k"), CF1, Util.cellname("c1"));
+    }
+
+    // CASSANDRA-3615
+    @Test
+    public void testEqualRecordLimit() throws Exception
+    {
+        CommitLog.instance.resetUnsafe(true);
+
+        Mutation rm = new Mutation(KEYSPACE1, bytes("k"));
+        rm.add(CF1, Util.cellname("c1"), ByteBuffer.allocate(getMaxRecordDataSize()), 0);
+        CommitLog.instance.add(rm);
+    }
+
+    @Test
+    public void testExceedRecordLimit() throws Exception
+    {
+        CommitLog.instance.resetUnsafe(true);
+        try
+        {
+            Mutation rm = new Mutation(KEYSPACE1, bytes("k"));
+            rm.add(CF1, Util.cellname("c1"), ByteBuffer.allocate(1 + getMaxRecordDataSize()), 0);
+            CommitLog.instance.add(rm);
+            throw new AssertionError("mutation larger than limit was accepted");
+        }
+        catch (IllegalArgumentException e)
+        {
+            // IAE is thrown on too-large mutations
+        }
+    }
+
+    protected void testRecoveryWithBadSizeArgument(int size, int dataSize) throws Exception
+    {
+        Checksum checksum = new CRC32();
+        checksum.update(size);
+        testRecoveryWithBadSizeArgument(size, dataSize, checksum.getValue());
+    }
+
+    protected void testRecoveryWithBadSizeArgument(int size, int dataSize, long checksum) throws Exception
+    {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        DataOutputStream dout = new DataOutputStream(out);
+        dout.writeInt(size);
+        dout.writeLong(checksum);
+        dout.write(new byte[dataSize]);
+        dout.close();
+        testRecovery(out.toByteArray(), CommitLogReplayException.class);
+    }
+
+    protected File tmpFile(int version) throws IOException
+    {
+        File logFile = File.createTempFile("CommitLog-" + version + "-", ".log");
+        logFile.deleteOnExit();
+        assert logFile.length() == 0;
+        return logFile;
+    }
+
+    protected Void testRecovery(byte[] logData, int version) throws Exception
+    {
+        File logFile = tmpFile(version);
+        try (OutputStream lout = new FileOutputStream(logFile))
+        {
+            lout.write(logData);
+            //statics make it annoying to test things correctly
+            CommitLog.instance.recover(logFile.getPath()); //CASSANDRA-1119 / CASSANDRA-1179 throw on failure*/
+        }
+        return null;
+    }
+
+    protected Void testRecovery(CommitLogDescriptor desc, byte[] logData) throws Exception
+    {
+        File logFile = tmpFile(desc.version);
+        CommitLogDescriptor fromFile = CommitLogDescriptor.fromFileName(logFile.getName());
+        // Change id to match file.
+        desc = new CommitLogDescriptor(desc.version, fromFile.id, desc.compression);
+        ByteBuffer buf = ByteBuffer.allocate(1024);
+        CommitLogDescriptor.writeHeader(buf, desc);
+        try (OutputStream lout = new FileOutputStream(logFile))
+        {
+            lout.write(buf.array(), 0, buf.position());
+            lout.write(logData);
+            //statics make it annoying to test things correctly
+            CommitLog.instance.recover(logFile.getPath()); //CASSANDRA-1119 / CASSANDRA-1179 throw on failure*/
+        }
+        return null;
+    }
+
+    @Test(expected = CommitLogReplayException.class)
+    public void testRecoveryWithIdMismatch() throws Exception
+    {
+        CommitLogDescriptor desc = new CommitLogDescriptor(4, null);
+        File logFile = tmpFile(desc.version);
+        ByteBuffer buf = ByteBuffer.allocate(1024);
+        CommitLogDescriptor.writeHeader(buf, desc);
+        try (OutputStream lout = new FileOutputStream(logFile))
+        {
+            lout.write(buf.array(), 0, buf.position());
+            //statics make it annoying to test things correctly
+            CommitLog.instance.recover(logFile.getPath()); //CASSANDRA-1119 / CASSANDRA-1179 throw on failure*/
+        }
+    }
+
+    @Test(expected = CommitLogReplayException.class)
+    public void testRecoveryWithBadCompressor() throws Exception
+    {
+        CommitLogDescriptor desc = new CommitLogDescriptor(4, new ParameterizedClass("UnknownCompressor", null));
+        testRecovery(desc, new byte[0]);
+    }
+
+    protected void runExpecting(Callable<Void> r, Class<?> expected)
+    {
+        Throwable caught = null;
+        try
+        {
+            r.call();
+        }
+        catch (Throwable t)
+        {
+            if (expected != t.getClass())
+                throw new AssertionError("Expected exception " + expected + ", got " + t, t);
+            caught = t;
+        }
+        if (expected != null && caught == null)
+            Assert.fail("Expected exception " + expected + " but call completed successfully.");
+    }
+
+    protected void testRecovery(final byte[] logData, Class<?> expected) throws Exception
+    {
+        runExpecting(new Callable<Void>() {
+            public Void call() throws Exception
+            {
+                return testRecovery(logData, CommitLogDescriptor.VERSION_20);
+            }
+        }, expected);
+        runExpecting(new Callable<Void>() {
+            public Void call() throws Exception
+            {
+                return testRecovery(new CommitLogDescriptor(4, null), logData);
+            }
+        }, expected);
+    }
+
+    @Test
+    public void testVersions()
+    {
+        Assert.assertTrue(CommitLogDescriptor.isValid("CommitLog-1340512736956320000.log"));
+        Assert.assertTrue(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000.log"));
+        Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog--1340512736956320000.log"));
+        Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog--2-1340512736956320000.log"));
+        Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000-123.log"));
+
+        Assert.assertEquals(1340512736956320000L, CommitLogDescriptor.fromFileName("CommitLog-2-1340512736956320000.log").id);
+
+        Assert.assertEquals(MessagingService.current_version, new CommitLogDescriptor(1340512736956320000L, null).getMessagingVersion());
+        String newCLName = "CommitLog-" + CommitLogDescriptor.current_version + "-1340512736956320000.log";
+        Assert.assertEquals(MessagingService.current_version, CommitLogDescriptor.fromFileName(newCLName).getMessagingVersion());
+    }
+
+    @Test
+    public void testTruncateWithoutSnapshot() throws ExecutionException, InterruptedException, IOException
+    {
+        CommitLog.instance.resetUnsafe(true);
+        boolean prev = DatabaseDescriptor.isAutoSnapshot();
+        DatabaseDescriptor.setAutoSnapshot(false);
+        ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore("Standard1");
+        ColumnFamilyStore cfs2 = Keyspace.open(KEYSPACE1).getColumnFamilyStore("Standard2");
+
+        final Mutation rm1 = new Mutation(KEYSPACE1, bytes("k"));
+        rm1.add("Standard1", Util.cellname("c1"), ByteBuffer.allocate(100), 0);
+        rm1.apply();
+        cfs1.truncateBlocking();
+        DatabaseDescriptor.setAutoSnapshot(prev);
+        final Mutation rm2 = new Mutation(KEYSPACE1, bytes("k"));
+        rm2.add("Standard2", Util.cellname("c1"), ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize() / 4), 0);
+
+        for (int i = 0 ; i < 5 ; i++)
+            CommitLog.instance.add(rm2);
+
+        Assert.assertEquals(2, CommitLog.instance.activeSegments());
+        ReplayPosition position = CommitLog.instance.getContext();
+        for (Keyspace ks : Keyspace.system())
+            for (ColumnFamilyStore syscfs : ks.getColumnFamilyStores())
+                CommitLog.instance.discardCompletedSegments(syscfs.metadata.cfId, position);
+        CommitLog.instance.discardCompletedSegments(cfs2.metadata.cfId, position);
+        Assert.assertEquals(1, CommitLog.instance.activeSegments());
+    }
+
+    @Test
+    public void testTruncateWithoutSnapshotNonDurable() throws IOException
+    {
+        CommitLog.instance.resetUnsafe(true);
+        boolean prevAutoSnapshot = DatabaseDescriptor.isAutoSnapshot();
+        DatabaseDescriptor.setAutoSnapshot(false);
+        Keyspace notDurableKs = Keyspace.open(KEYSPACE2);
+        Assert.assertFalse(notDurableKs.getMetadata().durableWrites);
+        ColumnFamilyStore cfs = notDurableKs.getColumnFamilyStore("Standard1");
+        CellNameType type = notDurableKs.getColumnFamilyStore("Standard1").getComparator();
+        Mutation rm;
+        DecoratedKey dk = Util.dk("key1");
+
+        // add data
+        rm = new Mutation(KEYSPACE2, dk.getKey());
+        rm.add("Standard1", Util.cellname("Column1"), ByteBufferUtil.bytes("abcd"), 0);
+        rm.apply();
+
+        ReadCommand command = new SliceByNamesReadCommand(KEYSPACE2, dk.getKey(), "Standard1", System.currentTimeMillis(), new NamesQueryFilter(FBUtilities.singleton(Util.cellname("Column1"), type)));
+        Row row = command.getRow(notDurableKs);
+        Cell col = row.cf.getColumn(Util.cellname("Column1"));
+        Assert.assertEquals(col.value(), ByteBuffer.wrap("abcd".getBytes()));
+        cfs.truncateBlocking();
+        DatabaseDescriptor.setAutoSnapshot(prevAutoSnapshot);
+        row = command.getRow(notDurableKs);
+        Assert.assertEquals(null, row.cf);
+    }
+    
+    private void testDescriptorPersistence(CommitLogDescriptor desc) throws IOException
+    {
+        ByteBuffer buf = ByteBuffer.allocate(1024);
+        CommitLogDescriptor.writeHeader(buf, desc);
+        long length = buf.position();
+        // Put some extra data in the stream.
+        buf.putDouble(0.1);
+        buf.flip();
+        FileDataInput input = new ByteBufferDataInput(buf, "input", 0, 0);
+        CommitLogDescriptor read = CommitLogDescriptor.readHeader(input);
+        Assert.assertEquals("Descriptor length", length, input.getFilePointer());
+        Assert.assertEquals("Descriptors", desc, read);
+    }
+    
+    @Test
+    public void testDescriptorPersistence() throws IOException
+    {
+        testDescriptorPersistence(new CommitLogDescriptor(11, null));
+        testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_21, 13, null));
+        testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_22, 15, null));
+        testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_22, 17, new ParameterizedClass("LZ4Compressor", null)));
+        testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_22, 19,
+                new ParameterizedClass("StubbyCompressor", ImmutableMap.of("parameter1", "value1", "flag2", "55", "argument3", "null"))));
+    }
+
+    @Test
+    public void testDescriptorInvalidParametersSize() throws IOException
+    {
+        Map<String, String> params = new HashMap<>();
+        for (int i=0; i<6000; ++i)
+            params.put("key"+i, Integer.toString(i, 16));
+        try {
+            CommitLogDescriptor desc = new CommitLogDescriptor(CommitLogDescriptor.VERSION_22,
+                                                               21,
+                                                               new ParameterizedClass("LZ4Compressor", params));
+            ByteBuffer buf = ByteBuffer.allocate(1024000);
+            CommitLogDescriptor.writeHeader(buf, desc);
+            Assert.fail("Parameter object too long should fail on writing descriptor.");
+        } catch (ConfigurationException e)
+        {
+            // correct path
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fce8478f/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java
index 1655078..2031475 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java
@@ -36,10 +36,13 @@ import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.Cell;
 import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.config.Config.CommitFailurePolicy;
 import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.commitlog.CommitLogReplayer.CommitLogReplayException;
 
 public class CommitLogUpgradeTest
 {
@@ -65,6 +68,58 @@ public class CommitLogUpgradeTest
         testRestore(DATA_DIR + "2.1");
     }
 
+    @Test
+    public void test22_truncated() throws Exception
+    {
+        testRestore(DATA_DIR + "2.2-lz4-truncated");
+    }
+
+    @Test(expected = CommitLogReplayException.class)
+    public void test22_bitrot() throws Exception
+    {
+        testRestore(DATA_DIR + "2.2-lz4-bitrot");
+    }
+
+    @Test
+    public void test22_bitrot_ignored() throws Exception
+    {
+        try {
+            System.setProperty(CommitLogReplayer.IGNORE_REPLAY_ERRORS_PROPERTY, "true");
+            testRestore(DATA_DIR + "2.2-lz4-bitrot");
+        } finally {
+            System.clearProperty(CommitLogReplayer.IGNORE_REPLAY_ERRORS_PROPERTY);
+        }
+    }
+
+    @Test
+    public void test22_bitrot_ignore_policy() throws Exception
+    {
+        CommitFailurePolicy existingPolicy = DatabaseDescriptor.getCommitFailurePolicy();
+        try {
+            DatabaseDescriptor.setCommitFailurePolicy(CommitFailurePolicy.ignore);
+            testRestore(DATA_DIR + "2.2-lz4-bitrot");
+        } finally {
+            DatabaseDescriptor.setCommitFailurePolicy(existingPolicy);
+        }
+    }
+
+    @Test(expected = CommitLogReplayException.class)
+    public void test22_bitrot2() throws Exception
+    {
+        testRestore(DATA_DIR + "2.2-lz4-bitrot2");
+    }
+
+    @Test
+    public void test22_bitrot2_ignored() throws Exception
+    {
+        try {
+            System.setProperty(CommitLogReplayer.IGNORE_REPLAY_ERRORS_PROPERTY, "true");
+            testRestore(DATA_DIR + "2.2-lz4-bitrot2");
+        } finally {
+            System.clearProperty(CommitLogReplayer.IGNORE_REPLAY_ERRORS_PROPERTY);
+        }
+    }
+
     @BeforeClass
     static public void initialize() throws FileNotFoundException, IOException, InterruptedException
     {


Mime
View raw message