cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jmcken...@apache.org
Subject [1/5] cassandra git commit: Add Change Data Capture
Date Thu, 16 Jun 2016 13:55:37 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk ed538f90e -> 5dcab286c


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/test/unit/org/apache/cassandra/db/commitlog/CommitLogReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogReaderTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogReaderTest.java
new file mode 100644
index 0000000..edff3b7
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogReaderTest.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.KillerForTests;
+
+public class CommitLogReaderTest extends CQLTester
+{
+    @BeforeClass
+    public static void beforeClass()
+    {
+        DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.ignore);
+        JVMStabilityInspector.replaceKiller(new KillerForTests(false));
+    }
+
+    @Before
+    public void before() throws IOException
+    {
+        CommitLog.instance.resetUnsafe(true);
+    }
+
+    @Test
+    public void testReadAll() throws Throwable
+    {
+        int samples = 1000;
+        populateData(samples);
+        ArrayList<File> toCheck = getCommitLogs();
+
+        CommitLogReader reader = new CommitLogReader();
+
+        TestCLRHandler testHandler = new TestCLRHandler(currentTableMetadata());
+        for (File f : toCheck)
+            reader.readCommitLogSegment(testHandler, f, CommitLogReader.ALL_MUTATIONS, false);
+
+        Assert.assertEquals("Expected 1000 seen mutations, got: " + testHandler.seenMutationCount(),
+                            1000, testHandler.seenMutationCount());
+
+        confirmReadOrder(testHandler, 0);
+    }
+
+    @Test
+    public void testReadCount() throws Throwable
+    {
+        int samples = 50;
+        int readCount = 10;
+        populateData(samples);
+        ArrayList<File> toCheck = getCommitLogs();
+
+        CommitLogReader reader = new CommitLogReader();
+        TestCLRHandler testHandler = new TestCLRHandler();
+
+        for (File f : toCheck)
+            reader.readCommitLogSegment(testHandler, f, readCount - testHandler.seenMutationCount(), false);
+
+        Assert.assertEquals("Expected " + readCount + " seen mutations, got: " + testHandler.seenMutations.size(),
+                            readCount, testHandler.seenMutationCount());
+    }
+
+    @Test
+    public void testReadFromMidpoint() throws Throwable
+    {
+        int samples = 1000;
+        int readCount = 500;
+        CommitLogPosition midpoint = populateData(samples);
+        ArrayList<File> toCheck = getCommitLogs();
+
+        CommitLogReader reader = new CommitLogReader();
+        TestCLRHandler testHandler = new TestCLRHandler();
+
+        // Will skip on incorrect segments due to id mismatch on midpoint
+        for (File f : toCheck)
+            reader.readCommitLogSegment(testHandler, f, midpoint, readCount, false);
+
+        // Confirm correct count on replay
+        Assert.assertEquals("Expected " + readCount + " seen mutations, got: " + testHandler.seenMutations.size(),
+                            readCount, testHandler.seenMutationCount());
+
+        confirmReadOrder(testHandler, samples / 2);
+    }
+
+    @Test
+    public void testReadFromMidpointTooMany() throws Throwable
+    {
+        int samples = 1000;
+        int readCount = 5000;
+        CommitLogPosition midpoint = populateData(samples);
+        ArrayList<File> toCheck = getCommitLogs();
+
+        CommitLogReader reader = new CommitLogReader();
+        TestCLRHandler testHandler = new TestCLRHandler(currentTableMetadata());
+
+        // Reading from mid to overflow by 4.5k
+        // Will skip on incorrect segments due to id mismatch on midpoint
+        for (File f : toCheck)
+            reader.readCommitLogSegment(testHandler, f, midpoint, readCount, false);
+
+        Assert.assertEquals("Expected " + samples / 2 + " seen mutations, got: " + testHandler.seenMutations.size(),
+                            samples / 2, testHandler.seenMutationCount());
+
+        confirmReadOrder(testHandler, samples / 2);
+    }
+
+    @Test
+    public void testReadCountFromMidpoint() throws Throwable
+    {
+        int samples = 1000;
+        int readCount = 10;
+        CommitLogPosition midpoint = populateData(samples);
+        ArrayList<File> toCheck = getCommitLogs();
+
+        CommitLogReader reader = new CommitLogReader();
+        TestCLRHandler testHandler = new TestCLRHandler();
+
+        for (File f: toCheck)
+            reader.readCommitLogSegment(testHandler, f, midpoint, readCount, false);
+
+        // Confirm correct count on replay
+        Assert.assertEquals("Expected " + readCount + " seen mutations, got: " + testHandler.seenMutations.size(),
+            readCount, testHandler.seenMutationCount());
+
+        confirmReadOrder(testHandler, samples / 2);
+    }
+
+    /**
+     * Since we have both cfm and non mixed into the CL, we ignore updates that aren't for the cfm the test handler
+     * is configured to check.
+     * @param handler
+     * @param offset integer offset of count we expect to see in record
+     */
+    private void confirmReadOrder(TestCLRHandler handler, int offset)
+    {
+        ColumnDefinition cd = currentTableMetadata().getColumnDefinition(new ColumnIdentifier("data", false));
+        int i = 0;
+        int j = 0;
+        while (i + j < handler.seenMutationCount())
+        {
+            PartitionUpdate pu = handler.seenMutations.get(i + j).get(currentTableMetadata());
+            if (pu == null)
+            {
+                j++;
+                continue;
+            }
+
+            for (Row r : pu)
+            {
+                String expected = Integer.toString(i + offset);
+                String seen = new String(r.getCell(cd).value().array());
+                if (!expected.equals(seen))
+                    Assert.fail("Mismatch at index: " + i + ". Offset: " + offset + " Expected: " + expected + " Seen: " + seen);
+            }
+            i++;
+        }
+    }
+
+    static ArrayList<File> getCommitLogs()
+    {
+        File dir = new File(DatabaseDescriptor.getCommitLogLocation());
+        File[] files = dir.listFiles();
+        ArrayList<File> results = new ArrayList<>();
+        for (File f : files)
+        {
+            if (f.isDirectory())
+                continue;
+            results.add(f);
+        }
+        Assert.assertTrue("Didn't find any commit log files.", 0 != results.size());
+        return results;
+    }
+
+    static class TestCLRHandler implements CommitLogReadHandler
+    {
+        public List<Mutation> seenMutations = new ArrayList<Mutation>();
+        public boolean sawStopOnErrorCheck = false;
+
+        private final CFMetaData cfm;
+
+        // Accept all
+        public TestCLRHandler()
+        {
+            this.cfm = null;
+        }
+
+        public TestCLRHandler(CFMetaData cfm)
+        {
+            this.cfm = cfm;
+        }
+
+        public boolean shouldSkipSegmentOnError(CommitLogReadException exception) throws IOException
+        {
+            sawStopOnErrorCheck = true;
+            return false;
+        }
+
+        public void handleUnrecoverableError(CommitLogReadException exception) throws IOException
+        {
+            sawStopOnErrorCheck = true;
+        }
+
+        public void handleMutation(Mutation m, int size, int entryLocation, CommitLogDescriptor desc)
+        {
+            if ((cfm == null) || (cfm != null && m.get(cfm) != null)) {
+                seenMutations.add(m);
+            }
+        }
+
+        public int seenMutationCount() { return seenMutations.size(); }
+    }
+
+    /**
+     * Returns offset of active written data at halfway point of data
+     */
+    CommitLogPosition populateData(int entryCount) throws Throwable
+    {
+        Assert.assertEquals("entryCount must be an even number.", 0, entryCount % 2);
+
+        createTable("CREATE TABLE %s (idx INT, data TEXT, PRIMARY KEY(idx));");
+        int midpoint = entryCount / 2;
+
+        for (int i = 0; i < midpoint; i++) {
+            execute("INSERT INTO %s (idx, data) VALUES (?, ?)", i, Integer.toString(i));
+        }
+
+        CommitLogPosition result = CommitLog.instance.getCurrentPosition();
+
+        for (int i = midpoint; i < entryCount; i++)
+            execute("INSERT INTO %s (idx, data) VALUES (?, ?)", i, Integer.toString(i));
+
+        Keyspace.open(keyspace()).getColumnFamilyStore(currentTable()).forceBlockingFlush();
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java
new file mode 100644
index 0000000..e308a2f
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.commitlog;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.commitlog.CommitLogSegment.CDCState;
+import org.apache.cassandra.exceptions.WriteTimeoutException;
+import org.apache.cassandra.io.util.FileUtils;
+
+public class CommitLogSegmentManagerCDCTest extends CQLTester
+{
+    private static Random random = new Random();
+
+    @BeforeClass
+    public static void checkConfig()
+    {
+        Assume.assumeTrue(DatabaseDescriptor.isCDCEnabled());
+    }
+
+    @Before
+    public void before() throws IOException
+    {
+        // disable reserve segment to get more deterministic allocation/testing of CDC boundary states
+        CommitLog.instance.forceRecycleAllSegments();
+        for (File f : new File(DatabaseDescriptor.getCDCLogLocation()).listFiles())
+            FileUtils.deleteWithConfirm(f);
+    }
+
+    @Test
+    public void testCDCWriteTimeout() throws Throwable
+    {
+        createTable("CREATE TABLE %s (idx int, data text, primary key(idx)) WITH cdc=true;");
+        CommitLogSegmentManagerCDC cdcMgr = (CommitLogSegmentManagerCDC)CommitLog.instance.segmentManager;
+        CFMetaData cfm = currentTableMetadata();
+
+        // Confirm that logic to check for whether or not we can allocate new CDC segments works
+        Integer originalCDCSize = DatabaseDescriptor.getCDCSpaceInMB();
+        try
+        {
+            DatabaseDescriptor.setCDCSpaceInMB(32);
+            // Spin until we hit CDC capacity and make sure we get a WriteTimeout
+            try
+            {
+                // Should trigger on anything < 20:1 compression ratio during compressed test
+                for (int i = 0; i < 100; i++)
+                {
+                    new RowUpdateBuilder(cfm, 0, i)
+                        .add("data", randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3))
+                        .build().apply();
+                }
+                Assert.fail("Expected WriteTimeoutException from full CDC but did not receive it.");
+            }
+            catch (WriteTimeoutException e)
+            {
+                // expected, do nothing
+            }
+            expectCurrentCDCState(CDCState.FORBIDDEN);
+
+            // Confirm we can create a non-cdc table and write to it even while at cdc capacity
+            createTable("CREATE TABLE %s (idx int, data text, primary key(idx)) WITH cdc=false;");
+            execute("INSERT INTO %s (idx, data) VALUES (1, '1');");
+
+            // Confirm that, on flush+recyle, we see files show up in cdc_raw
+            Keyspace.open(keyspace()).getColumnFamilyStore(currentTable()).forceBlockingFlush();
+            CommitLog.instance.forceRecycleAllSegments();
+            cdcMgr.awaitManagementTasksCompletion();
+            Assert.assertTrue("Expected files to be moved to overflow.", getCDCRawCount() > 0);
+
+            // Simulate a CDC consumer reading files then deleting them
+            for (File f : new File(DatabaseDescriptor.getCDCLogLocation()).listFiles())
+                FileUtils.deleteWithConfirm(f);
+
+            // Update size tracker to reflect deleted files. Should flip flag on current allocatingFrom to allow.
+            cdcMgr.updateCDCTotalSize();
+            expectCurrentCDCState(CDCState.PERMITTED);
+        }
+        finally
+        {
+            DatabaseDescriptor.setCDCSpaceInMB(originalCDCSize);
+        }
+    }
+
+    @Test
+    public void testCLSMCDCDiscardLogic() throws Throwable
+    {
+        CommitLogSegmentManagerCDC cdcMgr = (CommitLogSegmentManagerCDC)CommitLog.instance.segmentManager;
+
+        createTable("CREATE TABLE %s (idx int, data text, primary key(idx)) WITH cdc=false;");
+        for (int i = 0; i < 8; i++)
+        {
+            new RowUpdateBuilder(currentTableMetadata(), 0, i)
+                .add("data", randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3))
+                .build().apply();
+        }
+
+        // Should have 4 segments CDC since we haven't flushed yet, 3 PERMITTED, one of which is active, and 1 PERMITTED, in waiting
+        Assert.assertEquals(4 * DatabaseDescriptor.getCommitLogSegmentSize(), cdcMgr.updateCDCTotalSize());
+        expectCurrentCDCState(CDCState.PERMITTED);
+        CommitLog.instance.forceRecycleAllSegments();
+
+        // on flush, these PERMITTED should be deleted
+        Assert.assertEquals(0, new File(DatabaseDescriptor.getCDCLogLocation()).listFiles().length);
+
+        createTable("CREATE TABLE %s (idx int, data text, primary key(idx)) WITH cdc=true;");
+        for (int i = 0; i < 8; i++)
+        {
+            new RowUpdateBuilder(currentTableMetadata(), 0, i)
+                .add("data", randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3))
+                .build().apply();
+        }
+        // 4 total again, 3 CONTAINS, 1 in waiting PERMITTED
+        Assert.assertEquals(4 * DatabaseDescriptor.getCommitLogSegmentSize(), cdcMgr.updateCDCTotalSize());
+        CommitLog.instance.forceRecycleAllSegments();
+        expectCurrentCDCState(CDCState.PERMITTED);
+
+        // On flush, PERMITTED is deleted, CONTAINS is preserved.
+        cdcMgr.awaitManagementTasksCompletion();
+        int seen = getCDCRawCount();
+        Assert.assertTrue("Expected >3 files in cdc_raw, saw: " + seen, seen >= 3);
+    }
+
+    @Test
+    public void testSegmentFlaggingOnCreation() throws Throwable
+    {
+        CommitLogSegmentManagerCDC cdcMgr = (CommitLogSegmentManagerCDC)CommitLog.instance.segmentManager;
+        String ct = createTable("CREATE TABLE %s (idx int, data text, primary key(idx)) WITH cdc=true;");
+
+        int origSize = DatabaseDescriptor.getCDCSpaceInMB();
+        try
+        {
+            DatabaseDescriptor.setCDCSpaceInMB(16);
+            CFMetaData ccfm = Keyspace.open(keyspace()).getColumnFamilyStore(ct).metadata;
+            // Spin until we hit CDC capacity and make sure we get a WriteTimeout
+            try
+            {
+                for (int i = 0; i < 1000; i++)
+                {
+                    new RowUpdateBuilder(ccfm, 0, i)
+                        .add("data", randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3))
+                        .build().apply();
+                }
+                Assert.fail("Expected WriteTimeoutException from full CDC but did not receive it.");
+            }
+            catch (WriteTimeoutException e) { }
+
+            expectCurrentCDCState(CDCState.FORBIDDEN);
+            CommitLog.instance.forceRecycleAllSegments();
+
+            cdcMgr.awaitManagementTasksCompletion();
+            new File(DatabaseDescriptor.getCDCLogLocation()).listFiles()[0].delete();
+            cdcMgr.updateCDCTotalSize();
+            // Confirm cdc update process changes flag on active segment
+            expectCurrentCDCState(CDCState.PERMITTED);
+
+            // Clear out archived CDC files
+            for (File f : new File(DatabaseDescriptor.getCDCLogLocation()).listFiles()) {
+                FileUtils.deleteWithConfirm(f);
+            }
+
+            // Set space to 0, confirm newly allocated segments are FORBIDDEN
+            DatabaseDescriptor.setCDCSpaceInMB(0);
+            CommitLog.instance.forceRecycleAllSegments();
+            CommitLog.instance.segmentManager.awaitManagementTasksCompletion();
+            expectCurrentCDCState(CDCState.FORBIDDEN);
+        }
+        finally
+        {
+            DatabaseDescriptor.setCDCSpaceInMB(origSize);
+        }
+    }
+
+    private ByteBuffer randomizeBuffer(int size)
+    {
+        byte[] toWrap = new byte[size];
+        random.nextBytes(toWrap);
+        return ByteBuffer.wrap(toWrap);
+    }
+
+    private int getCDCRawCount()
+    {
+        return new File(DatabaseDescriptor.getCDCLogLocation()).listFiles().length;
+    }
+
+    private void expectCurrentCDCState(CDCState state)
+    {
+        Assert.assertEquals("Received unexpected CDCState on current allocatingFrom segment.",
+            state, CommitLog.instance.segmentManager.allocatingFrom.getCDCState());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java
index 6a4aace..b777389 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java
@@ -23,9 +23,14 @@ package org.apache.cassandra.db.commitlog;
 import java.nio.ByteBuffer;
 import java.util.Random;
 import java.util.concurrent.Semaphore;
-
 import javax.naming.ConfigurationException;
 
+import com.google.common.collect.ImmutableMap;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.config.Config.CommitLogSync;
@@ -41,12 +46,6 @@ import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.jboss.byteman.contrib.bmunit.BMRule;
 import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-import com.google.common.collect.ImmutableMap;
 
 @RunWith(BMUnitRunner.class)
 public class CommitLogSegmentManagerTest
@@ -99,9 +98,9 @@ public class CommitLogSegmentManagerTest
         });
         dummyThread.start();
 
-        CommitLogSegmentManager clsm = CommitLog.instance.allocator;
+        AbstractCommitLogSegmentManager clsm = CommitLog.instance.segmentManager;
 
-        //Protect against delay, but still break out as fast as possible
+        // Protect against delay, but still break out as fast as possible
         long start = System.currentTimeMillis();
         while (System.currentTimeMillis() - start < 5000)
         {
@@ -110,11 +109,11 @@ public class CommitLogSegmentManagerTest
         }
         Thread.sleep(1000);
 
-        //Should only be able to create 3 segments not 7 because it blocks waiting for truncation that never comes
+        // Should only be able to create 3 segments (not 7) because it blocks waiting for truncation that never comes.
         Assert.assertEquals(3, clsm.getActiveSegments().size());
 
-        clsm.getActiveSegments().forEach( segment -> clsm.recycleSegment(segment));
+        clsm.getActiveSegments().forEach(segment -> clsm.recycleSegment(segment));
 
         Util.spinAssertEquals(3, () -> clsm.getActiveSegments().size(), 5);
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index caa9fee..eff972d 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@ -47,13 +47,10 @@ import org.apache.cassandra.db.marshal.AsciiType;
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.rows.Row;
-import org.apache.cassandra.db.rows.SerializationHelper;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.compress.DeflateCompressor;
 import org.apache.cassandra.io.compress.LZ4Compressor;
 import org.apache.cassandra.io.compress.SnappyCompressor;
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.FastByteArrayInputStream;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.security.EncryptionContext;
@@ -64,6 +61,7 @@ import org.apache.cassandra.utils.KillerForTests;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.vint.VIntCoding;
 
+import org.junit.After;
 import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -76,6 +74,9 @@ public class CommitLogTest
     private static final String STANDARD1 = "Standard1";
     private static final String STANDARD2 = "Standard2";
 
+    private static JVMStabilityInspector.Killer oldKiller;
+    private static KillerForTests testKiller;
+
     public CommitLogTest(ParameterizedClass commitLogCompression, EncryptionContext encryptionContext)
     {
         DatabaseDescriptor.setCommitLogCompression(commitLogCompression);
@@ -94,7 +95,7 @@ public class CommitLogTest
     }
 
     @BeforeClass
-    public static void defineSchema() throws ConfigurationException
+    public static void beforeClass() throws ConfigurationException
     {
         SchemaLoader.prepareServer();
         SchemaLoader.createKeyspace(KEYSPACE1,
@@ -106,19 +107,38 @@ public class CommitLogTest
                                     SchemaLoader.standardCFMD(KEYSPACE1, STANDARD1, 0, AsciiType.instance, BytesType.instance),
                                     SchemaLoader.standardCFMD(KEYSPACE1, STANDARD2, 0, AsciiType.instance, BytesType.instance));
         CompactionManager.instance.disableAutoCompaction();
+
+        testKiller = new KillerForTests();
+
+        // While we don't want the JVM to be nuked from under us on a test failure, we DO want some indication of
+        // an error. If we hit a "Kill the JVM" condition while working with the CL when we don't expect it, an aggressive
+        // KillerForTests will assertion out on us.
+        oldKiller = JVMStabilityInspector.replaceKiller(testKiller);
+    }
+
+    @AfterClass
+    public static void afterClass()
+    {
+        JVMStabilityInspector.replaceKiller(oldKiller);
     }
 
     @Before
-    public void setup() throws IOException
+    public void beforeTest() throws IOException
     {
         CommitLog.instance.resetUnsafe(true);
     }
 
+    @After
+    public void afterTest()
+    {
+        testKiller.reset();
+    }
+
     @Test
     public void testRecoveryWithEmptyLog() throws Exception
     {
         runExpecting(() -> {
-            CommitLog.instance.recover(tmpFile(CommitLogDescriptor.current_version));
+            CommitLog.instance.recoverFiles(tmpFile(CommitLogDescriptor.current_version));
             return null;
         }, CommitLogReplayException.class);
     }
@@ -126,7 +146,7 @@ public class CommitLogTest
     @Test
     public void testRecoveryWithEmptyLog20() throws Exception
     {
-        CommitLog.instance.recover(tmpFile(CommitLogDescriptor.VERSION_20));
+        CommitLog.instance.recoverFiles(tmpFile(CommitLogDescriptor.VERSION_20));
     }
 
     @Test
@@ -202,8 +222,9 @@ public class CommitLogTest
     @Test
     public void testDontDeleteIfDirty() throws Exception
     {
-        ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
-        ColumnFamilyStore cfs2 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD2);
+        Keyspace ks = Keyspace.open(KEYSPACE1);
+        ColumnFamilyStore cfs1 = ks.getColumnFamilyStore(STANDARD1);
+        ColumnFamilyStore cfs2 = ks.getColumnFamilyStore(STANDARD2);
 
         // Roughly 32 MB mutation
         Mutation m = new RowUpdateBuilder(cfs1.metadata, 0, "k")
@@ -225,39 +246,40 @@ public class CommitLogTest
                       .build();
         CommitLog.instance.add(m2);
 
-        assertEquals(2, CommitLog.instance.activeSegments());
+        assertEquals(2, CommitLog.instance.segmentManager.getActiveSegments().size());
 
         UUID cfid2 = m2.getColumnFamilyIds().iterator().next();
-        CommitLog.instance.discardCompletedSegments(cfid2, CommitLog.instance.getContext());
+        CommitLog.instance.discardCompletedSegments(cfid2, CommitLog.instance.getCurrentPosition());
 
         // Assert we still have both our segments
-        assertEquals(2, CommitLog.instance.activeSegments());
+        assertEquals(2, CommitLog.instance.segmentManager.getActiveSegments().size());
     }
 
     @Test
     public void testDeleteIfNotDirty() throws Exception
     {
-        ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
-        ColumnFamilyStore cfs2 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD2);
+        Keyspace ks = Keyspace.open(KEYSPACE1);
+        ColumnFamilyStore cfs1 = ks.getColumnFamilyStore(STANDARD1);
+        ColumnFamilyStore cfs2 = ks.getColumnFamilyStore(STANDARD2);
 
         // Roughly 32 MB mutation
-        Mutation rm = new RowUpdateBuilder(cfs1.metadata, 0, "k")
-                      .clustering("bytes")
-                      .add("val", ByteBuffer.allocate((DatabaseDescriptor.getCommitLogSegmentSize()/4) - 1))
-                      .build();
+         Mutation rm = new RowUpdateBuilder(cfs1.metadata, 0, "k")
+                  .clustering("bytes")
+                  .add("val", ByteBuffer.allocate((DatabaseDescriptor.getCommitLogSegmentSize()/4) - 1))
+                  .build();
 
         // Adding it twice (won't change segment)
         CommitLog.instance.add(rm);
         CommitLog.instance.add(rm);
 
-        assertEquals(1, CommitLog.instance.activeSegments());
+        assertEquals(1, CommitLog.instance.segmentManager.getActiveSegments().size());
 
         // "Flush": this won't delete anything
         UUID cfid1 = rm.getColumnFamilyIds().iterator().next();
         CommitLog.instance.sync(true);
-        CommitLog.instance.discardCompletedSegments(cfid1, CommitLog.instance.getContext());
+        CommitLog.instance.discardCompletedSegments(cfid1, CommitLog.instance.getCurrentPosition());
 
-        assertEquals(1, CommitLog.instance.activeSegments());
+        assertEquals(1, CommitLog.instance.segmentManager.getActiveSegments().size());
 
         // Adding new mutation on another CF, large enough (including CL entry overhead) that a new segment is created
         Mutation rm2 = new RowUpdateBuilder(cfs2.metadata, 0, "k")
@@ -269,16 +291,16 @@ public class CommitLogTest
         CommitLog.instance.add(rm2);
         CommitLog.instance.add(rm2);
 
-        assertEquals(3, CommitLog.instance.activeSegments());
+        assertEquals(3, CommitLog.instance.segmentManager.getActiveSegments().size());
 
         // "Flush" second cf: The first segment should be deleted since we
         // didn't write anything on cf1 since last flush (and we flush cf2)
 
         UUID cfid2 = rm2.getColumnFamilyIds().iterator().next();
-        CommitLog.instance.discardCompletedSegments(cfid2, CommitLog.instance.getContext());
+        CommitLog.instance.discardCompletedSegments(cfid2, CommitLog.instance.getCurrentPosition());
 
         // Assert we still have both our segment
-        assertEquals(1, CommitLog.instance.activeSegments());
+        assertEquals(1, CommitLog.instance.segmentManager.getActiveSegments().size());
     }
 
     private static int getMaxRecordDataSize(String keyspace, ByteBuffer key, String cfName, String colName)
@@ -325,7 +347,8 @@ public class CommitLogTest
     @Test(expected = IllegalArgumentException.class)
     public void testExceedRecordLimit() throws Exception
     {
-        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+        Keyspace ks = Keyspace.open(KEYSPACE1);
+        ColumnFamilyStore cfs = ks.getColumnFamilyStore(STANDARD1);
         Mutation rm = new RowUpdateBuilder(cfs.metadata, 0, "k")
                       .clustering("bytes")
                       .add("val", ByteBuffer.allocate(1 + getMaxRecordDataSize()))
@@ -460,9 +483,6 @@ public class CommitLogTest
 
     protected void runExpecting(Callable<Void> r, Class<?> expected)
     {
-        KillerForTests killerForTests = new KillerForTests();
-        JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests);
-
         Throwable caught = null;
         try
         {
@@ -477,8 +497,7 @@ public class CommitLogTest
         if (expected != null && caught == null)
             Assert.fail("Expected exception " + expected + " but call completed successfully.");
 
-        JVMStabilityInspector.replaceKiller(originalKiller);
-        assertEquals("JVM killed", expected != null, killerForTests.wasKilled());
+        assertEquals("JVM kill state doesn't match expectation.", expected != null, testKiller.wasKilled());
     }
 
     protected void testRecovery(final byte[] logData, Class<?> expected) throws Exception
@@ -498,7 +517,7 @@ public class CommitLogTest
             raf.write(logData);
             raf.close();
 
-            CommitLog.instance.recover(pair.left); //CASSANDRA-1119 / CASSANDRA-1179 throw on failure*/
+            CommitLog.instance.recoverFiles(pair.left); //CASSANDRA-1119 / CASSANDRA-1179 throw on failure*/
         }
     }
 
@@ -510,8 +529,9 @@ public class CommitLogTest
         {
             boolean prev = DatabaseDescriptor.isAutoSnapshot();
             DatabaseDescriptor.setAutoSnapshot(false);
-            ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
-            ColumnFamilyStore cfs2 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD2);
+            Keyspace ks = Keyspace.open(KEYSPACE1);
+            ColumnFamilyStore cfs1 = ks.getColumnFamilyStore(STANDARD1);
+            ColumnFamilyStore cfs2 = ks.getColumnFamilyStore(STANDARD2);
 
             new RowUpdateBuilder(cfs1.metadata, 0, "k").clustering("bytes").add("val", ByteBuffer.allocate(100)).build().applyUnsafe();
             cfs1.truncateBlocking();
@@ -524,13 +544,13 @@ public class CommitLogTest
             for (int i = 0 ; i < 5 ; i++)
                 CommitLog.instance.add(m2);
 
-            assertEquals(2, CommitLog.instance.activeSegments());
-            ReplayPosition position = CommitLog.instance.getContext();
-            for (Keyspace ks : Keyspace.system())
-                for (ColumnFamilyStore syscfs : ks.getColumnFamilyStores())
+            assertEquals(2, CommitLog.instance.segmentManager.getActiveSegments().size());
+            CommitLogPosition position = CommitLog.instance.getCurrentPosition();
+            for (Keyspace keyspace : Keyspace.system())
+                for (ColumnFamilyStore syscfs : keyspace.getColumnFamilyStores())
                     CommitLog.instance.discardCompletedSegments(syscfs.metadata.cfId, position);
             CommitLog.instance.discardCompletedSegments(cfs2.metadata.cfId, position);
-            assertEquals(1, CommitLog.instance.activeSegments());
+            assertEquals(1, CommitLog.instance.segmentManager.getActiveSegments().size());
         }
         finally
         {
@@ -588,12 +608,12 @@ public class CommitLogTest
 
         CommitLog.instance.sync(true);
 
-        Replayer replayer = new Replayer(CommitLog.instance, ReplayPosition.NONE);
+        SimpleCountingReplayer replayer = new SimpleCountingReplayer(CommitLog.instance, CommitLogPosition.NONE);
         List<String> activeSegments = CommitLog.instance.getActiveSegmentNames();
         Assert.assertFalse(activeSegments.isEmpty());
 
-        File[] files = new File(CommitLog.instance.location).listFiles((file, name) -> activeSegments.contains(name));
-        replayer.recover(files);
+        File[] files = new File(CommitLog.instance.segmentManager.storageDirectory).listFiles((file, name) -> activeSegments.contains(name));
+        replayer.replayFiles(files);
 
         assertEquals(cellCount, replayer.cells);
     }
@@ -604,7 +624,7 @@ public class CommitLogTest
         int cellCount = 0;
         int max = 1024;
         int discardPosition = (int)(max * .8); // an arbitrary number of entries that we'll skip on the replay
-        ReplayPosition replayPosition = null;
+        CommitLogPosition commitLogPosition = null;
         ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
 
         for (int i = 0; i < max; i++)
@@ -613,10 +633,10 @@ public class CommitLogTest
                                  .clustering("bytes")
                                  .add("val", bytes("this is a string"))
                                  .build();
-            ReplayPosition position = CommitLog.instance.add(rm1);
+            CommitLogPosition position = CommitLog.instance.add(rm1);
 
             if (i == discardPosition)
-                replayPosition = position;
+                commitLogPosition = position;
             if (i > discardPosition)
             {
                 cellCount += 1;
@@ -625,30 +645,33 @@ public class CommitLogTest
 
         CommitLog.instance.sync(true);
 
-        Replayer replayer = new Replayer(CommitLog.instance, replayPosition);
+        SimpleCountingReplayer replayer = new SimpleCountingReplayer(CommitLog.instance, commitLogPosition);
         List<String> activeSegments = CommitLog.instance.getActiveSegmentNames();
         Assert.assertFalse(activeSegments.isEmpty());
 
-        File[] files = new File(CommitLog.instance.location).listFiles((file, name) -> activeSegments.contains(name));
-        replayer.recover(files);
+        File[] files = new File(CommitLog.instance.segmentManager.storageDirectory).listFiles((file, name) -> activeSegments.contains(name));
+        replayer.replayFiles(files);
 
         assertEquals(cellCount, replayer.cells);
     }
 
-    class Replayer extends CommitLogReplayer
+    class SimpleCountingReplayer extends CommitLogReplayer
     {
-        private final ReplayPosition filterPosition;
+        private final CommitLogPosition filterPosition;
+        private CommitLogReader reader;
         int cells;
         int skipped;
 
-        Replayer(CommitLog commitLog, ReplayPosition filterPosition)
+        SimpleCountingReplayer(CommitLog commitLog, CommitLogPosition filterPosition)
         {
             super(commitLog, filterPosition, Collections.emptyMap(), ReplayFilter.create());
             this.filterPosition = filterPosition;
+            this.reader = new CommitLogReader();
         }
 
         @SuppressWarnings("resource")
-        void replayMutation(byte[] inputBuffer, int size, final int entryLocation, final CommitLogDescriptor desc) throws IOException
+        @Override
+        public void handleMutation(Mutation m, int size, int entryLocation, CommitLogDescriptor desc)
         {
             if (entryLocation <= filterPosition.position)
             {
@@ -656,10 +679,7 @@ public class CommitLogTest
                 skipped++;
                 return;
             }
-
-            FastByteArrayInputStream bufIn = new FastByteArrayInputStream(inputBuffer, 0, size);
-            Mutation mutation = Mutation.serializer.deserialize(new DataInputPlus.DataInputStreamPlus(bufIn), desc.getMessagingVersion(), SerializationHelper.Flag.LOCAL);
-            for (PartitionUpdate partitionUpdate : mutation.getPartitionUpdates())
+            for (PartitionUpdate partitionUpdate : m.getPartitionUpdates())
                 for (Row row : partitionUpdate)
                     cells += Iterables.size(row.cells());
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
index e690785..9a22b04 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
@@ -22,13 +22,12 @@ import java.io.File;
 import java.io.IOException;
 
 import com.google.common.base.Predicate;
-
 import org.junit.Assert;
+
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.rows.SerializationHelper;
 import org.apache.cassandra.io.util.DataInputBuffer;
-import org.apache.cassandra.io.util.NIODataInputStream;
 import org.apache.cassandra.io.util.RebufferingInputStream;
 
 /**
@@ -36,44 +35,44 @@ import org.apache.cassandra.io.util.RebufferingInputStream;
  */
 public class CommitLogTestReplayer extends CommitLogReplayer
 {
-    public static void examineCommitLog(Predicate<Mutation> processor) throws IOException
+    private final Predicate<Mutation> processor;
+
+    public CommitLogTestReplayer(Predicate<Mutation> processor) throws IOException
     {
+        super(CommitLog.instance, CommitLogPosition.NONE, null, ReplayFilter.create());
         CommitLog.instance.sync(true);
 
-        CommitLogTestReplayer replayer = new CommitLogTestReplayer(CommitLog.instance, processor);
-        File commitLogDir = new File(DatabaseDescriptor.getCommitLogLocation());
-        replayer.recover(commitLogDir.listFiles());
-    }
-
-    final private Predicate<Mutation> processor;
-
-    public CommitLogTestReplayer(CommitLog log, Predicate<Mutation> processor)
-    {
-        this(log, ReplayPosition.NONE, processor);
+        this.processor = processor;
+        commitLogReader = new CommitLogTestReader();
     }
 
-    public CommitLogTestReplayer(CommitLog log, ReplayPosition discardedPos, Predicate<Mutation> processor)
+    public void examineCommitLog() throws IOException
     {
-        super(log, discardedPos, null, ReplayFilter.create());
-        this.processor = processor;
+        replayFiles(new File(DatabaseDescriptor.getCommitLogLocation()).listFiles());
     }
 
-    @Override
-    void replayMutation(byte[] inputBuffer, int size, final int entryLocation, final CommitLogDescriptor desc)
+    private class CommitLogTestReader extends CommitLogReader
     {
-        RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size);
-        Mutation mutation;
-        try
-        {
-            mutation = Mutation.serializer.deserialize(bufIn,
-                                                           desc.getMessagingVersion(),
-                                                           SerializationHelper.Flag.LOCAL);
-            Assert.assertTrue(processor.apply(mutation));
-        }
-        catch (IOException e)
+        @Override
+        protected void readMutation(CommitLogReadHandler handler,
+                                    byte[] inputBuffer,
+                                    int size,
+                                    CommitLogPosition minPosition,
+                                    final int entryLocation,
+                                    final CommitLogDescriptor desc) throws IOException
         {
-            // Test fails.
-            throw new AssertionError(e);
+            RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size);
+            Mutation mutation;
+            try
+            {
+                mutation = Mutation.serializer.deserialize(bufIn, desc.getMessagingVersion(), SerializationHelper.Flag.LOCAL);
+                Assert.assertTrue(processor.apply(mutation));
+            }
+            catch (IOException e)
+            {
+                // Test fails.
+                throw new AssertionError(e);
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java
index a49c4cf..90e4ffc 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java
@@ -137,10 +137,13 @@ public class CommitLogUpgradeTest
     @Test
     public void test22_bitrot_ignored() throws Exception
     {
-        try {
+        try
+        {
             System.setProperty(CommitLogReplayer.IGNORE_REPLAY_ERRORS_PROPERTY, "true");
             testRestore(DATA_DIR + "2.2-lz4-bitrot");
-        } finally {
+        }
+        finally
+        {
             System.clearProperty(CommitLogReplayer.IGNORE_REPLAY_ERRORS_PROPERTY);
         }
     }
@@ -155,10 +158,13 @@ public class CommitLogUpgradeTest
     @Test
     public void test22_bitrot2_ignored() throws Exception
     {
-        try {
+        try
+        {
             System.setProperty(CommitLogReplayer.IGNORE_REPLAY_ERRORS_PROPERTY, "true");
             testRestore(DATA_DIR + "2.2-lz4-bitrot2");
-        } finally {
+        }
+        finally
+        {
             System.clearProperty(CommitLogReplayer.IGNORE_REPLAY_ERRORS_PROPERTY);
         }
     }
@@ -199,9 +205,9 @@ public class CommitLogUpgradeTest
         }
 
         Hasher hasher = new Hasher();
-        CommitLogTestReplayer replayer = new CommitLogTestReplayer(CommitLog.instance, hasher);
+        CommitLogTestReplayer replayer = new CommitLogTestReplayer(hasher);
         File[] files = new File(location).listFiles((file, name) -> name.endsWith(".log"));
-        replayer.recover(files);
+        replayer.replayFiles(files);
 
         Assert.assertEquals(cells, hasher.cells);
         Assert.assertEquals(hash, hasher.hash);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java
index c8a6033..5a03f9f 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java
@@ -219,7 +219,7 @@ public class CommitLogUpgradeTestMaker
         int dataSize = 0;
         final CommitLog commitLog;
 
-        volatile ReplayPosition rp;
+        volatile CommitLogPosition clsp;
 
         public CommitlogExecutor(CommitLog commitLog)
         {
@@ -248,7 +248,7 @@ public class CommitLogUpgradeTestMaker
                     dataSize += sz;
                 }
 
-                rp = commitLog.add((Mutation)builder.makeMutation());
+                clsp = commitLog.add((Mutation)builder.makeMutation());
                 counter.incrementAndGet();
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/test/unit/org/apache/cassandra/db/commitlog/SegmentReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/SegmentReaderTest.java b/test/unit/org/apache/cassandra/db/commitlog/SegmentReaderTest.java
index 3ec0db2..034566e 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/SegmentReaderTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/SegmentReaderTest.java
@@ -31,9 +31,9 @@ import javax.crypto.Cipher;
 import org.junit.Assert;
 import org.junit.Test;
 
-import org.apache.cassandra.db.commitlog.SegmentReader.CompressedSegmenter;
-import org.apache.cassandra.db.commitlog.SegmentReader.EncryptedSegmenter;
-import org.apache.cassandra.db.commitlog.SegmentReader.SyncSegment;
+import org.apache.cassandra.db.commitlog.CommitLogSegmentReader.CompressedSegmenter;
+import org.apache.cassandra.db.commitlog.CommitLogSegmentReader.EncryptedSegmenter;
+import org.apache.cassandra.db.commitlog.CommitLogSegmentReader.SyncSegment;
 import org.apache.cassandra.io.compress.DeflateCompressor;
 import org.apache.cassandra.io.compress.ICompressor;
 import org.apache.cassandra.io.compress.LZ4Compressor;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
index 902e17a..1668ddc 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
@@ -39,7 +39,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Memtable;
 import org.apache.cassandra.db.commitlog.CommitLog;
-import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.notifications.*;
@@ -266,21 +266,21 @@ public class TrackerTest
         Tracker tracker = cfs.getTracker();
         tracker.subscribe(listener);
 
-        Memtable prev1 = tracker.switchMemtable(true, new Memtable(new AtomicReference<>(CommitLog.instance.getContext()), cfs));
+        Memtable prev1 = tracker.switchMemtable(true, new Memtable(new AtomicReference<>(CommitLog.instance.getCurrentPosition()), cfs));
         OpOrder.Group write1 = cfs.keyspace.writeOrder.getCurrent();
         OpOrder.Barrier barrier1 = cfs.keyspace.writeOrder.newBarrier();
-        prev1.setDiscarding(barrier1, new AtomicReference<>(CommitLog.instance.getContext()));
+        prev1.setDiscarding(barrier1, new AtomicReference<>(CommitLog.instance.getCurrentPosition()));
         barrier1.issue();
-        Memtable prev2 = tracker.switchMemtable(false, new Memtable(new AtomicReference<>(CommitLog.instance.getContext()), cfs));
+        Memtable prev2 = tracker.switchMemtable(false, new Memtable(new AtomicReference<>(CommitLog.instance.getCurrentPosition()), cfs));
         OpOrder.Group write2 = cfs.keyspace.writeOrder.getCurrent();
         OpOrder.Barrier barrier2 = cfs.keyspace.writeOrder.newBarrier();
-        prev2.setDiscarding(barrier2, new AtomicReference<>(CommitLog.instance.getContext()));
+        prev2.setDiscarding(barrier2, new AtomicReference<>(CommitLog.instance.getCurrentPosition()));
         barrier2.issue();
         Memtable cur = tracker.getView().getCurrentMemtable();
         OpOrder.Group writecur = cfs.keyspace.writeOrder.getCurrent();
-        Assert.assertEquals(prev1, tracker.getMemtableFor(write1, ReplayPosition.NONE));
-        Assert.assertEquals(prev2, tracker.getMemtableFor(write2, ReplayPosition.NONE));
-        Assert.assertEquals(cur, tracker.getMemtableFor(writecur, ReplayPosition.NONE));
+        Assert.assertEquals(prev1, tracker.getMemtableFor(write1, CommitLogPosition.NONE));
+        Assert.assertEquals(prev2, tracker.getMemtableFor(write2, CommitLogPosition.NONE));
+        Assert.assertEquals(cur, tracker.getMemtableFor(writecur, CommitLogPosition.NONE));
         Assert.assertEquals(2, listener.received.size());
         Assert.assertTrue(listener.received.get(0) instanceof MemtableRenewedNotification);
         Assert.assertTrue(listener.received.get(1) instanceof MemtableSwitchedNotification);
@@ -316,7 +316,7 @@ public class TrackerTest
         tracker = cfs.getTracker();
         listener = new MockListener(false);
         tracker.subscribe(listener);
-        prev1 = tracker.switchMemtable(false, new Memtable(new AtomicReference<>(CommitLog.instance.getContext()), cfs));
+        prev1 = tracker.switchMemtable(false, new Memtable(new AtomicReference<>(CommitLog.instance.getCurrentPosition()), cfs));
         tracker.markFlushing(prev1);
         reader = MockSchema.sstable(0, 10, true, cfs);
         cfs.invalidate(false);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/test/unit/org/apache/cassandra/hints/HintsEncryptionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/hints/HintsEncryptionTest.java b/test/unit/org/apache/cassandra/hints/HintsEncryptionTest.java
index 83b8481..beb95d1 100644
--- a/test/unit/org/apache/cassandra/hints/HintsEncryptionTest.java
+++ b/test/unit/org/apache/cassandra/hints/HintsEncryptionTest.java
@@ -42,7 +42,7 @@ public class HintsEncryptionTest extends AlteredHints
     }
 
     @Test
-    public void encyptedHints() throws Exception
+    public void encryptedHints() throws Exception
     {
         multiFlushAndDeserializeTest();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
index 93365ef..a3382c4 100644
--- a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
@@ -32,7 +32,7 @@ import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
 import org.apache.cassandra.dht.RandomPartitioner;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
@@ -80,8 +80,8 @@ public class MetadataSerializerTest
 
     public Map<MetadataType, MetadataComponent> constructMetadata()
     {
-        ReplayPosition club = new ReplayPosition(11L, 12);
-        ReplayPosition cllb = new ReplayPosition(9L, 12);
+        CommitLogPosition club = new CommitLogPosition(11L, 12);
+        CommitLogPosition cllb = new CommitLogPosition(9L, 12);
 
         CFMetaData cfm = SchemaLoader.standardCFMD("ks1", "cf1");
         MetadataCollector collector = new MetadataCollector(cfm.comparator)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/test/unit/org/apache/cassandra/utils/KillerForTests.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/KillerForTests.java b/test/unit/org/apache/cassandra/utils/KillerForTests.java
index abc7952..fe9aa45 100644
--- a/test/unit/org/apache/cassandra/utils/KillerForTests.java
+++ b/test/unit/org/apache/cassandra/utils/KillerForTests.java
@@ -18,6 +18,8 @@
 
 package org.apache.cassandra.utils;
 
+import org.junit.Assert;
+
 /**
  * Responsible for stubbing out the System.exit() logic during unit tests.
  */
@@ -25,10 +27,24 @@ public class KillerForTests extends JVMStabilityInspector.Killer
 {
     private boolean killed = false;
     private boolean quiet = false;
+    private final boolean expected;
+
+    public KillerForTests()
+    {
+        expected = true;
+    }
+
+    public KillerForTests(boolean expectFailure)
+    {
+        expected = expectFailure;
+    }
 
     @Override
     protected void killCurrentJVM(Throwable t, boolean quiet)
     {
+        if (!expected)
+            Assert.fail("Saw JVM Kill but did not expect it.");
+
         this.killed = true;
         this.quiet = quiet;
     }


Mime
View raw message