cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ble...@apache.org
Subject [3/3] cassandra git commit: Merge branch cassandra-3.0 into cassandra-3.7
Date Thu, 02 Jun 2016 10:50:55 GMT
Merge branch cassandra-3.0 into cassandra-3.7


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dc6ffc25
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dc6ffc25
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dc6ffc25

Branch: refs/heads/cassandra-3.7
Commit: dc6ffc25a8d00659385a1219d0189bd068ef110d
Parents: dbf0310 1e82695
Author: Benjamin Lerer <b.lerer@gmail.com>
Authored: Thu Jun 2 12:47:03 2016 +0200
Committer: Benjamin Lerer <b.lerer@gmail.com>
Committed: Thu Jun 2 12:50:19 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/db/commitlog/CommitLog.java       | 102 +++++++++-
 .../db/commitlog/CommitLogSegment.java          |  15 +-
 .../db/commitlog/CommitLogSegmentManager.java   |  17 +-
 .../db/commitlog/CompressedSegment.java         |   4 +-
 .../db/commitlog/EncryptedSegment.java          |   4 +-
 .../db/commitlog/CommitLogStressTest.java       |  12 +-
 .../db/RecoveryManagerFlushedTest.java          |  40 ++++
 .../db/RecoveryManagerMissingHeaderTest.java    |  38 +++-
 .../cassandra/db/RecoveryManagerTest.java       | 167 ++++++++++-------
 .../db/RecoveryManagerTruncateTest.java         |  38 ++++
 .../db/commitlog/CommitLogDescriptorTest.java   |   3 +-
 .../cassandra/db/commitlog/CommitLogTest.java   | 187 ++++++-------------
 .../db/commitlog/CommitLogUpgradeTestMaker.java |   4 +-
 14 files changed, 407 insertions(+), 225 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index a54f4fd,70da4ad..2a66eb4
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -22,11 -20,10 +22,12 @@@ Merged from 2.2
   * Enable client encryption in sstableloader with cli options (CASSANDRA-11708)
   * Possible memory leak in NIODataInputStream (CASSANDRA-11867)
   * Add seconds to cqlsh tracing session duration (CASSANDRA-11753)
 + * Fix commit log replay after out-of-order flush completion (CASSANDRA-9669)
   * Prohibit Reversed Counter type as part of the PK (CASSANDRA-9395)
 + * cqlsh: correctly handle non-ascii chars in error messages (CASSANDRA-11626)
  Merged from 2.1:
++ * Run CommitLog tests with different compression settings (CASSANDRA-9039)
   * cqlsh: apply current keyspace to source command (CASSANDRA-11152)
 - * Backport CASSANDRA-11578 (CASSANDRA-11750)
   * Clear out parent repair session if repair coordinator dies (CASSANDRA-11824)
   * Set default streaming_socket_timeout_in_ms to 24 hours (CASSANDRA-11840)
   * Do not consider local node a valid source during replace (CASSANDRA-11848)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index 10bc91a,dcdd855..4a660ca
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@@ -96,13 -92,10 +94,11 @@@ public class CommitLog implements Commi
      @VisibleForTesting
      CommitLog(String location, CommitLogArchiver archiver)
      {
-         compressorClass = DatabaseDescriptor.getCommitLogCompression();
          this.location = location;
-         ICompressor compressor = compressorClass != null ? CompressionParams.createCompressor(compressorClass) : null;
 -        this.configuration = new Configuration(DatabaseDescriptor.getCommitLogCompression());
++        this.configuration = new Configuration(DatabaseDescriptor.getCommitLogCompression(),
++                                               DatabaseDescriptor.getEncryptionContext());
          DatabaseDescriptor.createAllDirectories();
-         encryptionContext = DatabaseDescriptor.getEncryptionContext();
  
-         this.compressor = compressor;
          this.archiver = archiver;
          metrics = new CommitLogMetrics();
  
@@@ -146,7 -139,7 +142,8 @@@
          };
  
          // submit all existing files in the commit log dir for archiving prior to recovery - CASSANDRA-6904
--        for (File file : new File(DatabaseDescriptor.getCommitLogLocation()).listFiles(unmanagedFilesFilter))
++        File[] listFiles = new File(DatabaseDescriptor.getCommitLogLocation()).listFiles(unmanagedFilesFilter);
++        for (File file : listFiles)
          {
              archiver.maybeArchive(file.getPath(), file.getName());
              archiver.maybeWaitForArchiving(file.getName());
@@@ -420,6 -413,6 +418,15 @@@
      }
  
      /**
++     * FOR TESTING PURPOSES.
++     */
++    public void resetConfiguration()
++    {
++        configuration = new Configuration(DatabaseDescriptor.getCommitLogCompression(),
++                                          DatabaseDescriptor.getEncryptionContext());
++    }
++
++    /**
       * FOR TESTING PURPOSES. See CommitLogAllocator.
       */
      public void stopUnsafe(boolean deleteSegments)
@@@ -492,4 -493,59 +499,83 @@@
                  throw new AssertionError(DatabaseDescriptor.getCommitFailurePolicy());
          }
      }
+ 
+     public static final class Configuration
+     {
+         /**
+          * The compressor class.
+          */
+         private final ParameterizedClass compressorClass;
+ 
+         /**
+          * The compressor used to compress the segments.
+          */
+         private final ICompressor compressor;
+ 
 -        public Configuration(ParameterizedClass compressorClass)
++        /**
++         * The encryption context used to encrypt the segments.
++         */
++        private EncryptionContext encryptionContext;
++
++        public Configuration(ParameterizedClass compressorClass, EncryptionContext encryptionContext)
+         {
+             this.compressorClass = compressorClass;
+             this.compressor = compressorClass != null ? CompressionParams.createCompressor(compressorClass) : null;
++            this.encryptionContext = encryptionContext;
+         }
+ 
+         /**
+          * Checks if the segments must be compressed.
+          * @return <code>true</code> if the segments must be compressed, <code>false</code> otherwise.
+          */
+         public boolean useCompression()
+         {
+             return compressor != null;
+         }
+ 
+         /**
++         * Checks if the segments must be encrypted.
++         * @return <code>true</code> if the segments must be encrypted, <code>false</code> otherwise.
++         */
++        public boolean useEncryption()
++        {
++            return encryptionContext.isEnabled();
++        }
++
++        /**
+          * Returns the compressor used to compress the segments.
+          * @return the compressor used to compress the segments
+          */
+         public ICompressor getCompressor()
+         {
+             return compressor;
+         }
+ 
+         /**
+          * Returns the compressor class.
+          * @return the compressor class
+          */
+         public ParameterizedClass getCompressorClass()
+         {
+             return compressorClass;
+         }
+ 
+         /**
+          * Returns the compressor name.
+          * @return the compressor name.
+          */
+         public String getCompressorName()
+         {
+             return useCompression() ? compressor.getClass().getSimpleName() : "none";
+         }
++
++        /**
++         * Returns the encryption context used to encrypt the segments.
++         * @return the encryption context used to encrypt the segments
++         */
++        public EncryptionContext getEncryptionContext()
++        {
++            return encryptionContext;
++        }
+     }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index 8f8b523,27c05b4..2045c35
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@@ -46,6 -45,6 +46,7 @@@ import org.apache.cassandra.config.CFMe
  import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.config.Schema;
  import org.apache.cassandra.db.Mutation;
++import org.apache.cassandra.db.commitlog.CommitLog.Configuration;
  import org.apache.cassandra.db.partitions.PartitionUpdate;
  import org.apache.cassandra.io.FSWriteError;
  import org.apache.cassandra.io.util.FileUtils;
@@@ -122,11 -120,8 +123,12 @@@ public abstract class CommitLogSegmen
  
      static CommitLogSegment createSegment(CommitLog commitLog, Runnable onClose)
      {
-         CommitLogSegment segment = commitLog.encryptionContext.isEnabled() ? new EncryptedSegment(commitLog, commitLog.encryptionContext, onClose) :
-                commitLog.compressor != null ? new CompressedSegment(commitLog, onClose) :
-                                               new MemoryMappedSegment(commitLog);
 -        return commitLog.configuration.useCompression() ? new CompressedSegment(commitLog, onClose)
 -                                                        : new MemoryMappedSegment(commitLog);
++        Configuration config = commitLog.configuration;
++        CommitLogSegment segment = config.useEncryption() ? new EncryptedSegment(commitLog, onClose)
++                                                          : config.useCompression() ? new CompressedSegment(commitLog, onClose)
++                                                                                    : new MemoryMappedSegment(commitLog);
 +        segment.writeLogHeader();
 +        return segment;
      }
  
      /**
@@@ -137,7 -132,7 +139,8 @@@
       */
      static boolean usesBufferPool(CommitLog commitLog)
      {
-         return commitLog.encryptionContext.isEnabled() || commitLog.compressor != null;
 -        return commitLog.configuration.useCompression();
++        Configuration config = commitLog.configuration;
++        return config.useEncryption() || config.useCompression();
      }
  
      static long getNextId()
@@@ -152,7 -149,7 +155,9 @@@
      {
          this.commitLog = commitLog;
          id = getNextId();
-         descriptor = new CommitLogDescriptor(id, commitLog.compressorClass, commitLog.encryptionContext);
 -        descriptor = new CommitLogDescriptor(id, commitLog.configuration.getCompressorClass());
++        descriptor = new CommitLogDescriptor(id,
++                                             commitLog.configuration.getCompressorClass(),
++                                             commitLog.configuration.getEncryptionContext());
          logFile = new File(commitLog.location, descriptor.fileName());
  
          try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
index 573428a,c73a30a..684fc2c
--- a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
@@@ -46,8 -68,18 +46,8 @@@ public class CompressedSegment extends 
       */
      CompressedSegment(CommitLog commitLog, Runnable onClose)
      {
 -        super(commitLog);
 +        super(commitLog, onClose);
-         this.compressor = commitLog.compressor;
+         this.compressor = commitLog.configuration.getCompressor();
 -        this.onClose = onClose;
 -        try
 -        {
 -            channel.write((ByteBuffer) buffer.duplicate().flip());
 -            commitLog.allocator.addSize(lastWrittenPos = buffer.position());
 -        }
 -        catch (IOException e)
 -        {
 -            throw new FSWriteError(e, getPath());
 -        }
      }
  
      ByteBuffer allocate(int size)
@@@ -57,9 -89,21 +57,9 @@@
  
      ByteBuffer createBuffer(CommitLog commitLog)
      {
-         return createBuffer(commitLog.compressor.preferredBufferType());
 -        usedBuffers.incrementAndGet();
 -        ByteBuffer buf = bufferPool.poll();
 -        if (buf == null)
 -        {
 -            // this.compressor is not yet set, so we must use the commitLog's one.
 -            buf = commitLog.configuration.getCompressor()
 -                                         .preferredBufferType()
 -                                         .allocate(DatabaseDescriptor.getCommitLogSegmentSize());
 -        } else
 -            buf.clear();
 -        return buf;
++        return createBuffer(commitLog.configuration.getCompressor().preferredBufferType());
      }
  
 -    static long startMillis = System.currentTimeMillis();
 -
      @Override
      void write(int startMarker, int nextMarker)
      {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
index 731dea4,0000000..c34a365
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
@@@ -1,161 -1,0 +1,161 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db.commitlog;
 +
 +import java.io.IOException;
 +import java.nio.ByteBuffer;
 +import java.util.Map;
 +import javax.crypto.Cipher;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.io.FSWriteError;
 +import org.apache.cassandra.io.compress.BufferType;
 +import org.apache.cassandra.io.compress.ICompressor;
 +import org.apache.cassandra.security.EncryptionUtils;
 +import org.apache.cassandra.security.EncryptionContext;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.utils.Hex;
 +import org.apache.cassandra.utils.SyncUtil;
 +
 +import static org.apache.cassandra.security.EncryptionUtils.ENCRYPTED_BLOCK_HEADER_SIZE;
 +
 +/**
 + * Writes encrypted segments to disk. Data is compressed before encrypting to (hopefully) reduce the size of the data into
 + * the encryption algorithms.
 + *
 + * The format of the encrypted commit log is as follows:
 + * - standard commit log header (as written by {@link CommitLogDescriptor#writeHeader(ByteBuffer, CommitLogDescriptor)})
 + * - a series of 'sync segments' that are written every time the commit log is sync()'ed
 + * -- a sync section header, see {@link CommitLogSegment#writeSyncMarker(ByteBuffer, int, int, int)}
 + * -- total plain text length for this section
 + * -- a series of encrypted data blocks, each of which contains:
 + * --- the length of the encrypted block (cipher text)
 + * --- the length of the unencrypted data (compressed text)
 + * --- the encrypted block, which contains:
 + * ---- the length of the plain text (raw) data
 + * ---- block of compressed data
 + *
 + * Notes:
 + * - "length of the unencrypted data" is different from the length of resulting decrypted buffer as encryption adds padding
 + * to the output buffer, and we need to ignore that padding when processing.
 + */
 +public class EncryptedSegment extends FileDirectSegment
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(EncryptedSegment.class);
 +
 +    private static final int ENCRYPTED_SECTION_HEADER_SIZE = SYNC_MARKER_SIZE + 4;
 +
 +    private final EncryptionContext encryptionContext;
 +    private final Cipher cipher;
 +
-     public EncryptedSegment(CommitLog commitLog, EncryptionContext encryptionContext, Runnable onClose)
++    public EncryptedSegment(CommitLog commitLog, Runnable onClose)
 +    {
 +        super(commitLog, onClose);
-         this.encryptionContext = encryptionContext;
++        this.encryptionContext = commitLog.configuration.getEncryptionContext();
 +
 +        try
 +        {
 +            cipher = encryptionContext.getEncryptor();
 +        }
 +        catch (IOException e)
 +        {
 +            throw new FSWriteError(e, logFile);
 +        }
 +        logger.debug("created a new encrypted commit log segment: {}", logFile);
 +    }
 +
 +    protected Map<String, String> additionalHeaderParameters()
 +    {
 +        Map<String, String> map = encryptionContext.toHeaderParameters();
 +        map.put(EncryptionContext.ENCRYPTION_IV, Hex.bytesToHex(cipher.getIV()));
 +        return map;
 +    }
 +
 +    ByteBuffer createBuffer(CommitLog commitLog)
 +    {
 +        //Note: we want to keep the compression buffers on-heap as we need those bytes for encryption,
 +        // and we want to avoid copying from off-heap (compression buffer) to on-heap encryption APIs
 +        return createBuffer(BufferType.ON_HEAP);
 +    }
 +
 +    void write(int startMarker, int nextMarker)
 +    {
 +        int contentStart = startMarker + SYNC_MARKER_SIZE;
 +        final int length = nextMarker - contentStart;
 +        // The length may be 0 when the segment is being closed.
 +        assert length > 0 || length == 0 && !isStillAllocating();
 +
 +        final ICompressor compressor = encryptionContext.getCompressor();
 +        final int blockSize = encryptionContext.getChunkLength();
 +        try
 +        {
 +            ByteBuffer inputBuffer = buffer.duplicate();
 +            inputBuffer.limit(contentStart + length).position(contentStart);
 +            ByteBuffer buffer = reusableBufferHolder.get();
 +
 +            // save space for the sync marker at the beginning of this section
 +            final long syncMarkerPosition = lastWrittenPos;
 +            channel.position(syncMarkerPosition + ENCRYPTED_SECTION_HEADER_SIZE);
 +
 +            // loop over the segment data in encryption buffer sized chunks
 +            while (contentStart < nextMarker)
 +            {
 +                int nextBlockSize = nextMarker - blockSize > contentStart ? blockSize : nextMarker - contentStart;
 +                ByteBuffer slice = inputBuffer.duplicate();
 +                slice.limit(contentStart + nextBlockSize).position(contentStart);
 +
 +                buffer = EncryptionUtils.compress(slice, buffer, true, compressor);
 +
 +                // reuse the same buffer for the input and output of the encryption operation
 +                buffer = EncryptionUtils.encryptAndWrite(buffer, channel, true, cipher);
 +
 +                contentStart += nextBlockSize;
 +                commitLog.allocator.addSize(buffer.limit() + ENCRYPTED_BLOCK_HEADER_SIZE);
 +            }
 +
 +            lastWrittenPos = channel.position();
 +
 +            // rewind to the beginning of the section and write out the sync marker,
 +            // reusing the one of the existing buffers
 +            buffer = ByteBufferUtil.ensureCapacity(buffer, ENCRYPTED_SECTION_HEADER_SIZE, true);
 +            writeSyncMarker(buffer, 0, (int) syncMarkerPosition, (int) lastWrittenPos);
 +            buffer.putInt(SYNC_MARKER_SIZE, length);
 +            buffer.position(0).limit(ENCRYPTED_SECTION_HEADER_SIZE);
 +            commitLog.allocator.addSize(buffer.limit());
 +
 +            channel.position(syncMarkerPosition);
 +            channel.write(buffer);
 +
 +            SyncUtil.force(channel, true);
 +
 +            if (reusableBufferHolder.get().capacity() < buffer.capacity())
 +                reusableBufferHolder.set(buffer);
 +        }
 +        catch (Exception e)
 +        {
 +            throw new FSWriteError(e, getPath());
 +        }
 +    }
 +
 +    public long onDiskSize()
 +    {
 +        return lastWrittenPos;
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
----------------------------------------------------------------------
diff --cc test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
index 8e45eea,d517055..0474b32
--- a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
+++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
@@@ -200,43 -198,34 +200,43 @@@ public class CommitLogStressTes
          DatabaseDescriptor.setCommitLogSyncBatchWindow(1);
          DatabaseDescriptor.setCommitLogSyncPeriod(30);
          DatabaseDescriptor.setCommitLogSegmentSize(32);
 -        for (ParameterizedClass compressor : new ParameterizedClass[] {
 -                null,
 -                new ParameterizedClass("LZ4Compressor", null),
 -                new ParameterizedClass("SnappyCompressor", null),
 -                new ParameterizedClass("DeflateCompressor", null) })
 +
 +        // test plain vanilla commit logs (the choice of 98% of users)
 +        testLog(null, EncryptionContextGenerator.createDisabledContext());
 +
 +        // test the compression types
 +        testLog(new ParameterizedClass("LZ4Compressor", null), EncryptionContextGenerator.createDisabledContext());
 +        testLog(new ParameterizedClass("SnappyCompressor", null), EncryptionContextGenerator.createDisabledContext());
 +        testLog(new ParameterizedClass("DeflateCompressor", null), EncryptionContextGenerator.createDisabledContext());
 +
 +        // test the encrypted commit log
 +        testLog(null, EncryptionContextGenerator.createContext(true));
 +    }
 +
 +    public void testLog(ParameterizedClass compression, EncryptionContext encryptionContext) throws IOException, InterruptedException
 +    {
 +        DatabaseDescriptor.setCommitLogCompression(compression);
 +        DatabaseDescriptor.setEncryptionContext(encryptionContext);
 +        for (CommitLogSync sync : CommitLogSync.values())
          {
 -            DatabaseDescriptor.setCommitLogCompression(compressor);
 -            for (CommitLogSync sync : CommitLogSync.values())
 -            {
 -                DatabaseDescriptor.setCommitLogSync(sync);
 -                CommitLog commitLog = new CommitLog(location, CommitLogArchiver.disabled()).start();
 -                testLog(commitLog);
 -            }
 +            DatabaseDescriptor.setCommitLogSync(sync);
 +            CommitLog commitLog = new CommitLog(location, CommitLogArchiver.disabled()).start();
 +            testLog(commitLog);
 +            assert !failed;
          }
 -        assert !failed;
      }
  
 -    public void testLog(CommitLog commitLog) throws IOException, InterruptedException
 -    {
 -        System.out.format("\nTesting commit log size %.0fmb, compressor %s, sync %s%s%s\n",
 -                          mb(DatabaseDescriptor.getCommitLogSegmentSize()),
 -                          commitLog.configuration.getCompressorName(),
 -                          commitLog.executor.getClass().getSimpleName(),
 -                          randomSize ? " random size" : "",
 -                          discardedRun ? " with discarded run" : "");
 +    public void testLog(CommitLog commitLog) throws IOException, InterruptedException {
 +        System.out.format("\nTesting commit log size %.0fmb, compressor: %s, encryption enabled: %b, sync %s%s%s\n",
 +                           mb(DatabaseDescriptor.getCommitLogSegmentSize()),
-                            commitLog.compressor != null ? commitLog.compressor.getClass().getSimpleName() : "none",
-                            commitLog.encryptionContext.isEnabled(),
++                           commitLog.configuration.getCompressorName(),
++                           commitLog.configuration.useEncryption(),
 +                           commitLog.executor.getClass().getSimpleName(),
 +                           randomSize ? " random size" : "",
 +                           discardedRun ? " with discarded run" : "");
          commitLog.allocator.enableReserveSegmentCreation();
 -
 -        final List<CommitlogExecutor> threads = new ArrayList<>();
 +        
 +        final List<CommitlogThread> threads = new ArrayList<>();
          ScheduledExecutorService scheduled = startThreads(commitLog, threads);
  
          discardedPos = ReplayPosition.NONE;
@@@ -294,17 -282,14 +294,17 @@@
                  Assert.fail("Failed to delete " + f);
  
          if (hash == repl.hash && cells == repl.cells)
 -            System.out.println("Test success.");
 +            System.out.format("Test success. compressor = %s, encryption enabled = %b; discarded = %d, skipped = %d\n",
-                               commitLog.compressor != null ? commitLog.compressor.getClass().getSimpleName() : "none",
-                               commitLog.encryptionContext.isEnabled(),
++                              commitLog.configuration.getCompressorName(),
++                              commitLog.configuration.useEncryption(),
 +                              repl.discarded, repl.skipped);
          else
          {
 -            System.out.format("Test failed. Cells %d expected %d, hash %d expected %d.\n",
 -                              repl.cells,
 -                              cells,
 -                              repl.hash,
 -                              hash);
 +            System.out.format("Test failed (compressor = %s, encryption enabled = %b). Cells %d, expected %d, diff %d; discarded = %d, skipped = %d -  hash %d expected %d.\n",
-                               commitLog.compressor != null ? commitLog.compressor.getClass().getSimpleName() : "none",
-                               commitLog.encryptionContext.isEnabled(),
++                              commitLog.configuration.getCompressorName(),
++                              commitLog.configuration.useEncryption(),
 +                              repl.cells, cells, cells - repl.cells, repl.discarded, repl.skipped,
 +                              repl.hash, hash);
              failed = true;
          }
      }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/test/unit/org/apache/cassandra/db/RecoveryManagerFlushedTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/RecoveryManagerFlushedTest.java
index e24af0f,d06c112..86fa5b4
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerFlushedTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerFlushedTest.java
@@@ -25,13 -34,19 +34,21 @@@ import org.slf4j.Logger
  import org.slf4j.LoggerFactory;
  
  import org.apache.cassandra.SchemaLoader;
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.config.ParameterizedClass;
 -import org.apache.cassandra.db.commitlog.CommitLog;
  import org.apache.cassandra.db.compaction.CompactionManager;
 +import org.apache.cassandra.db.commitlog.CommitLog;
  import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.io.compress.DeflateCompressor;
+ import org.apache.cassandra.io.compress.LZ4Compressor;
+ import org.apache.cassandra.io.compress.SnappyCompressor;
  import org.apache.cassandra.schema.KeyspaceParams;
  import org.apache.cassandra.schema.SchemaKeyspace;
++import org.apache.cassandra.security.EncryptionContext;
++import org.apache.cassandra.security.EncryptionContextGenerator;
  import org.apache.cassandra.utils.FBUtilities;
  
+ @RunWith(Parameterized.class)
  public class RecoveryManagerFlushedTest
  {
      private static Logger logger = LoggerFactory.getLogger(RecoveryManagerFlushedTest.class);
@@@ -40,14 -55,35 +57,37 @@@
      private static final String CF_STANDARD1 = "Standard1";
      private static final String CF_STANDARD2 = "Standard2";
  
 -    @BeforeClass
 -    public static void defineSchema() throws ConfigurationException
++    public RecoveryManagerFlushedTest(ParameterizedClass commitLogCompression, EncryptionContext encryptionContext)
+     {
 -        SchemaLoader.prepareServer();
 -        SchemaLoader.createKeyspace(KEYSPACE1,
 -                                    KeyspaceParams.simple(1),
 -                                    SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1),
 -                                    SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD2));
++        DatabaseDescriptor.setCommitLogCompression(commitLogCompression);
++        DatabaseDescriptor.setEncryptionContext(encryptionContext);
+     }
+ 
 -    public RecoveryManagerFlushedTest(ParameterizedClass commitLogCompression)
++    @Parameters()
++    public static Collection<Object[]> generateData()
+     {
 -        DatabaseDescriptor.setCommitLogCompression(commitLogCompression);
++        return Arrays.asList(new Object[][]{
++            {null, EncryptionContextGenerator.createDisabledContext()}, // No compression, no encryption
++            {null, EncryptionContextGenerator.createContext(true)}, // Encryption
++            {new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
++            {new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
++            {new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()}});
+     }
+ 
+     @Before
+     public void setUp() throws IOException
+     {
+         CommitLog.instance.resetUnsafe(true);
+     }
+ 
 -    @Parameters()
 -    public static Collection<Object[]> generateData()
 +    @BeforeClass
 +    public static void defineSchema() throws ConfigurationException
      {
 -        return Arrays.asList(new Object[][] {
 -                { null }, // No compression
 -                { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()) },
 -                { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()) },
 -                { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()) } });
 +        SchemaLoader.prepareServer();
 +        SchemaLoader.createKeyspace(KEYSPACE1,
 +                                    KeyspaceParams.simple(1),
 +                                    SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1),
 +                                    SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD2));
      }
  
      @Test

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/test/unit/org/apache/cassandra/db/RecoveryManagerMissingHeaderTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/RecoveryManagerMissingHeaderTest.java
index 9275dae,8ac7c5d..a67e9e5
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerMissingHeaderTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerMissingHeaderTest.java
@@@ -28,13 -35,17 +35,19 @@@ import org.junit.runners.Parameterized.
  import org.apache.cassandra.SchemaLoader;
  import org.apache.cassandra.Util;
  import org.apache.cassandra.config.DatabaseDescriptor;
- import org.apache.cassandra.db.rows.AbstractUnfilteredRowIterator;
+ import org.apache.cassandra.config.ParameterizedClass;
 -import org.apache.cassandra.db.commitlog.CommitLog;
  import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 +import org.apache.cassandra.db.commitlog.CommitLog;
  import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.io.compress.DeflateCompressor;
+ import org.apache.cassandra.io.compress.LZ4Compressor;
+ import org.apache.cassandra.io.compress.SnappyCompressor;
  import org.apache.cassandra.io.util.FileUtils;
  import org.apache.cassandra.schema.KeyspaceParams;
++import org.apache.cassandra.security.EncryptionContext;
++import org.apache.cassandra.security.EncryptionContextGenerator;
  
+ @RunWith(Parameterized.class)
  public class RecoveryManagerMissingHeaderTest
  {
      private static final String KEYSPACE1 = "RecoveryManager3Test1";
@@@ -43,6 -54,27 +56,29 @@@
      private static final String KEYSPACE2 = "RecoveryManager3Test2";
      private static final String CF_STANDARD3 = "Standard3";
  
 -    public RecoveryManagerMissingHeaderTest(ParameterizedClass commitLogCompression)
++    public RecoveryManagerMissingHeaderTest(ParameterizedClass commitLogCompression, EncryptionContext encryptionContext)
+     {
+         DatabaseDescriptor.setCommitLogCompression(commitLogCompression);
++        DatabaseDescriptor.setEncryptionContext(encryptionContext);
+     }
+ 
 -    @Before
 -    public void setUp() throws IOException
++    @Parameters()
++    public static Collection<Object[]> generateData()
+     {
 -        CommitLog.instance.resetUnsafe(true);
++        return Arrays.asList(new Object[][]{
++            {null, EncryptionContextGenerator.createDisabledContext()}, // No compression, no encryption
++            {null, EncryptionContextGenerator.createContext(true)}, // Encryption
++            {new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
++            {new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
++            {new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()}});
+     }
+ 
 -    @Parameters()
 -    public static Collection<Object[]> generateData()
++    @Before
++    public void setUp() throws IOException
+     {
 -        return Arrays.asList(new Object[][] {
 -                { null }, // No compression
 -                { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()) },
 -                { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()) },
 -                { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()) } });
++        CommitLog.instance.resetUnsafe(true);
+     }
+ 
      @BeforeClass
      public static void defineSchema() throws ConfigurationException
      {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
index 5ac53f6,397030a..37d719e
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
@@@ -19,40 -19,43 +19,51 @@@
  package org.apache.cassandra.db;
  
  import java.io.IOException;
+ import java.util.Arrays;
+ import java.util.Collection;
+ import java.util.Collections;
  import java.util.Date;
 +import java.util.concurrent.ExecutionException;
 +import java.util.concurrent.Future;
 +import java.util.concurrent.Semaphore;
  import java.util.concurrent.TimeUnit;
 -
 -import org.junit.Assert;
 -import org.junit.Before;
 -import org.junit.BeforeClass;
 -import org.junit.Test;
 -import org.junit.runner.RunWith;
 -import org.junit.runners.Parameterized;
 -import org.junit.runners.Parameterized.Parameters;
 +import java.util.concurrent.TimeoutException;
 +import java.util.concurrent.atomic.AtomicReference;
  
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
- import org.apache.cassandra.OrderedJUnit4ClassRunner;
 -import org.apache.cassandra.SchemaLoader;
  import org.apache.cassandra.Util;
  import org.apache.cassandra.config.ColumnDefinition;
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.config.ParameterizedClass;
 -import org.apache.cassandra.db.commitlog.CommitLog;
 -import org.apache.cassandra.db.commitlog.CommitLogArchiver;
 +import org.apache.cassandra.db.rows.*;
  import org.apache.cassandra.db.context.CounterContext;
 -import org.apache.cassandra.db.rows.Row;
 -import org.apache.cassandra.db.rows.UnfilteredRowIterator;
  import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.io.compress.DeflateCompressor;
+ import org.apache.cassandra.io.compress.LZ4Compressor;
+ import org.apache.cassandra.io.compress.SnappyCompressor;
 -import org.apache.cassandra.schema.KeyspaceParams;
 -import org.apache.cassandra.utils.ByteBufferUtil;
 +
 +import org.junit.Assert;
 +import org.junit.Before;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +import org.junit.runner.RunWith;
++import org.junit.runners.Parameterized;
++import org.junit.runners.Parameterized.Parameters;
  
  import static org.junit.Assert.assertEquals;
  
 +import org.apache.cassandra.SchemaLoader;
 +import org.apache.cassandra.db.commitlog.CommitLog;
 +import org.apache.cassandra.db.commitlog.CommitLogArchiver;
 +import org.apache.cassandra.schema.KeyspaceParams;
++import org.apache.cassandra.security.EncryptionContext;
++import org.apache.cassandra.security.EncryptionContextGenerator;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.db.commitlog.CommitLogReplayer;
 +
- @RunWith(OrderedJUnit4ClassRunner.class)
+ @RunWith(Parameterized.class)
  public class RecoveryManagerTest
  {
      private static Logger logger = LoggerFactory.getLogger(RecoveryManagerTest.class);
@@@ -123,6 -67,6 +75,29 @@@
      private static final String KEYSPACE2 = "RecoveryManagerTest2";
      private static final String CF_STANDARD3 = "Standard3";
  
++    public RecoveryManagerTest(ParameterizedClass commitLogCompression, EncryptionContext encryptionContext)
++    {
++        DatabaseDescriptor.setCommitLogCompression(commitLogCompression);
++        DatabaseDescriptor.setEncryptionContext(encryptionContext);
++    }
++
++    @Parameters()
++    public static Collection<Object[]> generateData()
++    {
++        return Arrays.asList(new Object[][]{
++            {null, EncryptionContextGenerator.createDisabledContext()}, // No compression, no encryption
++            {null, EncryptionContextGenerator.createContext(true)}, // Encryption
++            {new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
++            {new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
++            {new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()}});
++    }
++
++    @Before
++    public void setUp() throws IOException
++    {
++        CommitLog.instance.resetUnsafe(true);
++    }
++
      @BeforeClass
      public static void defineSchema() throws ConfigurationException
      {
@@@ -139,6 -83,6 +114,7 @@@
      @Before
      public void clearData()
      {
++        // clear data
          Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1).truncateBlocking();
          Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_COUNTER1).truncateBlocking();
          Keyspace.open(KEYSPACE2).getColumnFamilyStore(CF_STANDARD3).truncateBlocking();
@@@ -151,77 -103,11 +127,78 @@@
      }
  
      @Test
 -    public void testNothingToRecover() throws IOException
 +    public void testRecoverBlocksOnBytesOutstanding() throws Exception
      {
 -        CommitLog.instance.resetUnsafe(true);
 +        long originalMaxOutstanding = CommitLogReplayer.MAX_OUTSTANDING_REPLAY_BYTES;
 +        CommitLogReplayer.MAX_OUTSTANDING_REPLAY_BYTES = 1;
 +        CommitLogReplayer.MutationInitiator originalInitiator = CommitLogReplayer.mutationInitiator;
++        MockInitiator mockInitiator = new MockInitiator();
 +        CommitLogReplayer.mutationInitiator = mockInitiator;
 +        try
 +        {
 +            CommitLog.instance.resetUnsafe(true);
 +            Keyspace keyspace1 = Keyspace.open(KEYSPACE1);
 +            Keyspace keyspace2 = Keyspace.open(KEYSPACE2);
 +
 +            UnfilteredRowIterator upd1 = Util.apply(new RowUpdateBuilder(keyspace1.getColumnFamilyStore(CF_STANDARD1).metadata, 1L, 0, "keymulti")
 +                .clustering("col1").add("val", "1")
 +                .build());
 +
 +            UnfilteredRowIterator upd2 = Util.apply(new RowUpdateBuilder(keyspace2.getColumnFamilyStore(CF_STANDARD3).metadata, 1L, 0, "keymulti")
 +                                           .clustering("col2").add("val", "1")
 +                                           .build());
 +
 +            keyspace1.getColumnFamilyStore("Standard1").clearUnsafe();
 +            keyspace2.getColumnFamilyStore("Standard3").clearUnsafe();
 +
 +            DecoratedKey dk = Util.dk("keymulti");
 +            Assert.assertTrue(Util.getAllUnfiltered(Util.cmd(keyspace1.getColumnFamilyStore(CF_STANDARD1), dk).build()).isEmpty());
 +            Assert.assertTrue(Util.getAllUnfiltered(Util.cmd(keyspace2.getColumnFamilyStore(CF_STANDARD3), dk).build()).isEmpty());
 +
 +            final AtomicReference<Throwable> err = new AtomicReference<Throwable>();
 +            Thread t = new Thread() {
 +                @Override
 +                public void run()
 +                {
 +                    try
 +                    {
 +                        CommitLog.instance.resetUnsafe(false); // disassociate segments from live CL
 +                    }
 +                    catch (Throwable t)
 +                    {
 +                        err.set(t);
 +                    }
 +                }
 +            };
 +            t.start();
-             Assert.assertTrue(blocked.tryAcquire(1, 20, TimeUnit.SECONDS));
++            Assert.assertTrue(mockInitiator.blocked.tryAcquire(1, 20, TimeUnit.SECONDS));
 +            Thread.sleep(100);
 +            Assert.assertTrue(t.isAlive());
-             blocker.release(Integer.MAX_VALUE);
++            mockInitiator.blocker.release(Integer.MAX_VALUE);
 +            t.join(20 * 1000);
 +
 +            if (err.get() != null)
 +                throw new RuntimeException(err.get());
 +
 +            if (t.isAlive())
 +            {
 +                Throwable toPrint = new Throwable();
 +                toPrint.setStackTrace(Thread.getAllStackTraces().get(t));
 +                toPrint.printStackTrace(System.out);
 +            }
 +            Assert.assertFalse(t.isAlive());
 +
 +            Assert.assertTrue(Util.equal(upd1, Util.getOnlyPartitionUnfiltered(Util.cmd(keyspace1.getColumnFamilyStore(CF_STANDARD1), dk).build()).unfilteredIterator()));
 +            Assert.assertTrue(Util.equal(upd2, Util.getOnlyPartitionUnfiltered(Util.cmd(keyspace2.getColumnFamilyStore(CF_STANDARD3), dk).build()).unfilteredIterator()));
 +        }
 +        finally
 +        {
 +            CommitLogReplayer.mutationInitiator = originalInitiator;
 +            CommitLogReplayer.MAX_OUTSTANDING_REPLAY_BYTES = originalMaxOutstanding;
 +        }
      }
  
 +
      @Test
      public void testOne() throws IOException
      {
@@@ -273,8 -159,8 +250,8 @@@
      @Test
      public void testRecoverPIT() throws Exception
      {
--        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1);
          CommitLog.instance.resetUnsafe(true);
++        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1);
          Date date = CommitLogArchiver.format.parse("2112:12:12 12:12:12");
          long timeMS = date.getTime() - 5000;
  
@@@ -301,8 -187,8 +278,8 @@@
      @Test
      public void testRecoverPITUnordered() throws Exception
      {
--        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1);
          CommitLog.instance.resetUnsafe(true);
++        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1);
          Date date = CommitLogArchiver.format.parse("2112:12:12 12:12:12");
          long timeMS = date.getTime();
  
@@@ -332,4 -218,4 +309,64 @@@
  
          assertEquals(2, Util.getAll(Util.cmd(cfs).build()).size());
      }
++
++    private static class MockInitiator extends CommitLogReplayer.MutationInitiator
++    {
++        final Semaphore blocker = new Semaphore(0);
++        final Semaphore blocked = new Semaphore(0);
++
++        @Override
++        protected Future<Integer> initiateMutation(final Mutation mutation,
++                final long segmentId,
++                final int serializedSize,
++                final int entryLocation,
++                final CommitLogReplayer clr)
++        {
++            final Future<Integer> toWrap = super.initiateMutation(mutation,
++                                                                  segmentId,
++                                                                  serializedSize,
++                                                                  entryLocation,
++                                                                  clr);
++            return new Future<Integer>()
++            {
++
++                @Override
++                public boolean cancel(boolean mayInterruptIfRunning)
++                {
++                    throw new UnsupportedOperationException();
++                }
++
++                @Override
++                public boolean isCancelled()
++                {
++                    throw new UnsupportedOperationException();
++                }
++
++                @Override
++                public boolean isDone()
++                {
++                    return blocker.availablePermits() > 0 && toWrap.isDone();
++                }
++
++                @Override
++                public Integer get() throws InterruptedException, ExecutionException
++                {
++                    System.out.println("Got blocker once");
++                    blocked.release();
++                    blocker.acquire();
++                    return toWrap.get();
++                }
++
++                @Override
++                public Integer get(long timeout, TimeUnit unit)
++                        throws InterruptedException, ExecutionException, TimeoutException
++                {
++                    blocked.release();
++                    blocker.tryAcquire(1, timeout, unit);
++                    return toWrap.get(timeout, unit);
++                }
++
++            };
++        }
++    };
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
index 7c8ab7d,5a59f1c..738888f
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
@@@ -19,17 -19,29 +19,31 @@@
  package org.apache.cassandra.db;
  
  import java.io.IOException;
+ import java.util.Arrays;
+ import java.util.Collection;
+ import java.util.Collections;
  
 -import org.junit.Before;
 -import org.junit.BeforeClass;
 -import org.junit.Test;
 -import org.junit.runner.RunWith;
 -import org.junit.runners.Parameterized;
 -import org.junit.runners.Parameterized.Parameters;
 -
  import org.apache.cassandra.SchemaLoader;
  import org.apache.cassandra.Util;
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.config.ParameterizedClass;
  import org.apache.cassandra.db.commitlog.CommitLog;
  import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.io.compress.DeflateCompressor;
+ import org.apache.cassandra.io.compress.LZ4Compressor;
+ import org.apache.cassandra.io.compress.SnappyCompressor;
  import org.apache.cassandra.schema.KeyspaceParams;
++import org.apache.cassandra.security.EncryptionContext;
++import org.apache.cassandra.security.EncryptionContextGenerator;
 +
++import org.junit.Before;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
++import org.junit.runner.RunWith;
++import org.junit.runners.Parameterized;
++import org.junit.runners.Parameterized.Parameters;
  
 -import static org.junit.Assert.assertTrue;
 +import static org.junit.Assert.*;
  
  /**
   * Test for the truncate operation.
@@@ -39,6 -52,27 +54,29 @@@ public class RecoveryManagerTruncateTes
      private static final String KEYSPACE1 = "RecoveryManagerTruncateTest";
      private static final String CF_STANDARD1 = "Standard1";
  
 -    public RecoveryManagerTruncateTest(ParameterizedClass commitLogCompression)
++    public RecoveryManagerTruncateTest(ParameterizedClass commitLogCompression, EncryptionContext encryptionContext)
+     {
+         DatabaseDescriptor.setCommitLogCompression(commitLogCompression);
++        DatabaseDescriptor.setEncryptionContext(encryptionContext);
+     }
+ 
 -    @Before
 -    public void setUp() throws IOException
++    @Parameters()
++    public static Collection<Object[]> generateData()
+     {
 -        CommitLog.instance.resetUnsafe(true);
++        return Arrays.asList(new Object[][]{
++            {null, EncryptionContextGenerator.createDisabledContext()}, // No compression, no encryption
++            {null, EncryptionContextGenerator.createContext(true)}, // Encryption
++            {new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
++            {new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
++            {new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()}});
+     }
+ 
 -    @Parameters()
 -    public static Collection<Object[]> generateData()
++    @Before
++    public void setUp() throws IOException
+     {
 -        return Arrays.asList(new Object[][] {
 -                { null }, // No compression
 -                { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()) },
 -                { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()) },
 -                { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()) } });
++        CommitLog.instance.resetUnsafe(true);
+     }
+ 
      @BeforeClass
      public static void defineSchema() throws ConfigurationException
      {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java
index ab9cb6f,898c19f..fdedafd
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java
@@@ -15,6 -15,6 +15,7 @@@
   * See the License for the specific language governing permissions and
   * limitations under the License.
   */
++
  package org.apache.cassandra.db.commitlog;
  
  import java.io.IOException;
@@@ -117,195 -83,20 +118,195 @@@ public class CommitLogDescriptorTes
      @Test
      public void testDescriptorInvalidParametersSize() throws IOException
      {
 -        final int numberOfParameters = 65535;
 -        Map<String, String> params = new HashMap<>(numberOfParameters);
 -        for (int i=0; i<numberOfParameters; ++i)
 +        Map<String, String> params = new HashMap<>();
 +        for (int i=0; i<65535; ++i)
              params.put("key"+i, Integer.toString(i, 16));
          try {
 -            CommitLogDescriptor desc = new CommitLogDescriptor(CommitLogDescriptor.VERSION_30,
 +            CommitLogDescriptor desc = new CommitLogDescriptor(CommitLogDescriptor.VERSION_22,
                                                                 21,
 -                                                               new ParameterizedClass("LZ4Compressor", params));
 +                                                               new ParameterizedClass("LZ4Compressor", params),
 +                                                               neverEnabledEncryption);
++
              ByteBuffer buf = ByteBuffer.allocate(1024000);
              CommitLogDescriptor.writeHeader(buf, desc);
 -            fail("Parameter object too long should fail on writing descriptor.");
 +            Assert.fail("Parameter object too long should fail on writing descriptor.");
          } catch (ConfigurationException e)
          {
              // correct path
          }
      }
 +
 +    @Test
 +    public void constructParametersString_NoCompressionOrEncryption()
 +    {
 +        String json = CommitLogDescriptor.constructParametersString(null, null, Collections.emptyMap());
 +        Assert.assertFalse(json.contains(CommitLogDescriptor.COMPRESSION_CLASS_KEY));
 +        Assert.assertFalse(json.contains(EncryptionContext.ENCRYPTION_CIPHER));
 +
 +        json = CommitLogDescriptor.constructParametersString(null, neverEnabledEncryption, Collections.emptyMap());
 +        Assert.assertFalse(json.contains(CommitLogDescriptor.COMPRESSION_CLASS_KEY));
 +        Assert.assertFalse(json.contains(EncryptionContext.ENCRYPTION_CIPHER));
 +    }
 +
 +    @Test
 +    public void constructParametersString_WithCompressionAndEncryption()
 +    {
 +        String json = CommitLogDescriptor.constructParametersString(compression, enabledEncryption, Collections.emptyMap());
 +        Assert.assertTrue(json.contains(CommitLogDescriptor.COMPRESSION_CLASS_KEY));
 +        Assert.assertTrue(json.contains(EncryptionContext.ENCRYPTION_CIPHER));
 +    }
 +
 +    @Test
 +    public void writeAndReadHeader_NoCompressionOrEncryption() throws IOException
 +    {
 +        CommitLogDescriptor descriptor = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, neverEnabledEncryption);
 +        ByteBuffer buffer = ByteBuffer.allocate(16 * 1024);
 +        CommitLogDescriptor.writeHeader(buffer, descriptor);
 +        buffer.flip();
 +        FileSegmentInputStream dataInput = new FileSegmentInputStream(buffer, null, 0);
 +        CommitLogDescriptor result = CommitLogDescriptor.readHeader(dataInput, neverEnabledEncryption);
 +        Assert.assertNotNull(result);
 +        Assert.assertNull(result.compression);
 +        Assert.assertFalse(result.getEncryptionContext().isEnabled());
 +    }
 +
 +    @Test
 +    public void writeAndReadHeader_OnlyCompression() throws IOException
 +    {
 +        CommitLogDescriptor descriptor = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, neverEnabledEncryption);
 +        ByteBuffer buffer = ByteBuffer.allocate(16 * 1024);
 +        CommitLogDescriptor.writeHeader(buffer, descriptor);
 +        buffer.flip();
 +        FileSegmentInputStream dataInput = new FileSegmentInputStream(buffer, null, 0);
 +        CommitLogDescriptor result = CommitLogDescriptor.readHeader(dataInput, neverEnabledEncryption);
 +        Assert.assertNotNull(result);
 +        Assert.assertEquals(compression, result.compression);
 +        Assert.assertFalse(result.getEncryptionContext().isEnabled());
 +    }
 +
 +    @Test
 +    public void writeAndReadHeader_WithEncryptionHeader_EncryptionEnabledInYaml() throws IOException
 +    {
 +        CommitLogDescriptor descriptor = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, enabledEncryption);
 +        ByteBuffer buffer = ByteBuffer.allocate(16 * 1024);
 +        CommitLogDescriptor.writeHeader(buffer, descriptor);
 +        buffer.flip();
 +        FileSegmentInputStream dataInput = new FileSegmentInputStream(buffer, null, 0);
 +        CommitLogDescriptor result = CommitLogDescriptor.readHeader(dataInput, enabledEncryption);
 +        Assert.assertNotNull(result);
 +        Assert.assertNull(result.compression);
 +        Assert.assertTrue(result.getEncryptionContext().isEnabled());
 +        Assert.assertArrayEquals(iv, result.getEncryptionContext().getIV());
 +    }
 +
 +    /**
 +     * Check that even though enabledTdeOptions is disabled in the yaml, we can still read the commit log header as encrypted.
 +     */
 +    @Test
 +    public void writeAndReadHeader_WithEncryptionHeader_EncryptionDisabledInYaml() throws IOException
 +    {
 +        CommitLogDescriptor descriptor = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, enabledEncryption);
 +        ByteBuffer buffer = ByteBuffer.allocate(16 * 1024);
 +        CommitLogDescriptor.writeHeader(buffer, descriptor);
 +        buffer.flip();
 +        FileSegmentInputStream dataInput = new FileSegmentInputStream(buffer, null, 0);
 +        CommitLogDescriptor result = CommitLogDescriptor.readHeader(dataInput, previouslyEnabledEncryption);
 +        Assert.assertNotNull(result);
 +        Assert.assertNull(result.compression);
 +        Assert.assertTrue(result.getEncryptionContext().isEnabled());
 +        Assert.assertArrayEquals(iv, result.getEncryptionContext().getIV());
 +    }
 +
 +    /**
 +     * Shouldn't happen in the real world (should only have either compression or enabledTdeOptions), but the header
 +     * functionality should be correct
 +     */
 +    @Test
 +    public void writeAndReadHeader_WithCompressionAndEncryption() throws IOException
 +    {
 +        CommitLogDescriptor descriptor = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, enabledEncryption);
 +        ByteBuffer buffer = ByteBuffer.allocate(16 * 1024);
 +        CommitLogDescriptor.writeHeader(buffer, descriptor);
 +        buffer.flip();
 +        FileSegmentInputStream dataInput = new FileSegmentInputStream(buffer, null, 0);
 +        CommitLogDescriptor result = CommitLogDescriptor.readHeader(dataInput, enabledEncryption);
 +        Assert.assertNotNull(result);
 +        Assert.assertEquals(compression, result.compression);
 +        Assert.assertTrue(result.getEncryptionContext().isEnabled());
 +        Assert.assertEquals(enabledEncryption, result.getEncryptionContext());
 +        Assert.assertArrayEquals(iv, result.getEncryptionContext().getIV());
 +    }
 +
 +    @Test
 +    public void equals_NoCompressionOrEncryption()
 +    {
 +        CommitLogDescriptor desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, null);
 +        Assert.assertEquals(desc1, desc1);
 +
 +        CommitLogDescriptor desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, null);
 +        Assert.assertEquals(desc1, desc2);
 +
 +        desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, neverEnabledEncryption);
 +        Assert.assertEquals(desc1, desc1);
 +        desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, neverEnabledEncryption);
 +        Assert.assertEquals(desc1, desc2);
 +
 +        desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, previouslyEnabledEncryption);
 +        Assert.assertEquals(desc1, desc1);
 +        desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, previouslyEnabledEncryption);
 +        Assert.assertEquals(desc1, desc2);
 +    }
 +
 +    @Test
 +    public void equals_OnlyCompression()
 +    {
 +        CommitLogDescriptor desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, null);
 +        Assert.assertEquals(desc1, desc1);
 +
 +        CommitLogDescriptor desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, null);
 +        Assert.assertEquals(desc1, desc2);
 +
 +        desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, neverEnabledEncryption);
 +        Assert.assertEquals(desc1, desc1);
 +        desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, neverEnabledEncryption);
 +        Assert.assertEquals(desc1, desc2);
 +
 +        desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, previouslyEnabledEncryption);
 +        Assert.assertEquals(desc1, desc1);
 +        desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, previouslyEnabledEncryption);
 +        Assert.assertEquals(desc1, desc2);
 +    }
 +
 +    @Test
 +    public void equals_OnlyEncryption()
 +    {
 +        CommitLogDescriptor desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, enabledEncryption);
 +        Assert.assertEquals(desc1, desc1);
 +
 +        CommitLogDescriptor desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, enabledEncryption);
 +        Assert.assertEquals(desc1, desc2);
 +
 +        desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, neverEnabledEncryption);
 +        Assert.assertEquals(desc1, desc1);
 +        desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, neverEnabledEncryption);
 +        Assert.assertEquals(desc1, desc2);
 +
 +        desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, previouslyEnabledEncryption);
 +        Assert.assertEquals(desc1, desc1);
 +        desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, previouslyEnabledEncryption);
 +        Assert.assertEquals(desc1, desc2);
 +    }
 +
 +    /**
 +     * Shouldn't have both enabled in real life, but ensure they are correct, nonetheless
 +     */
 +    @Test
 +    public void equals_BothCompressionAndEncryption()
 +    {
 +        CommitLogDescriptor desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, enabledEncryption);
 +        Assert.assertEquals(desc1, desc1);
 +
 +        CommitLogDescriptor desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, enabledEncryption);
 +        Assert.assertEquals(desc1, desc2);
 +    }
- 
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index 1ea0eb1,39ba886..caa9fee
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@@ -26,9 -34,13 +26,12 @@@ import java.util.concurrent.ExecutionEx
  import java.util.zip.CRC32;
  import java.util.zip.Checksum;
  
 -import org.junit.Assert;
 -import org.junit.Before;
 -import org.junit.BeforeClass;
 -import org.junit.Test;
 +import com.google.common.collect.Iterables;
 +
 +import org.junit.*;
+ import org.junit.runner.RunWith;
+ import org.junit.runners.Parameterized;
+ import org.junit.runners.Parameterized.Parameters;
  
  import org.apache.cassandra.SchemaLoader;
  import org.apache.cassandra.Util;
@@@ -46,13 -58,11 +52,16 @@@ import org.apache.cassandra.exceptions.
  import org.apache.cassandra.io.compress.DeflateCompressor;
  import org.apache.cassandra.io.compress.LZ4Compressor;
  import org.apache.cassandra.io.compress.SnappyCompressor;
 +import org.apache.cassandra.io.util.DataInputPlus;
 +import org.apache.cassandra.io.util.FastByteArrayInputStream;
  import org.apache.cassandra.net.MessagingService;
  import org.apache.cassandra.schema.KeyspaceParams;
 -import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.security.EncryptionContext;
 +import org.apache.cassandra.security.EncryptionContextGenerator;
- import org.apache.cassandra.utils.*;
++import org.apache.cassandra.utils.Hex;
+ import org.apache.cassandra.utils.JVMStabilityInspector;
+ import org.apache.cassandra.utils.KillerForTests;
++import org.apache.cassandra.utils.Pair;
  import org.apache.cassandra.utils.vint.VIntCoding;
  
  import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
@@@ -66,7 -77,26 +76,22 @@@ public class CommitLogTes
      private static final String STANDARD1 = "Standard1";
      private static final String STANDARD2 = "Standard2";
  
-     String logDirectory;
 -    public CommitLogTest(ParameterizedClass commitLogCompression)
++    public CommitLogTest(ParameterizedClass commitLogCompression, EncryptionContext encryptionContext)
+     {
+         DatabaseDescriptor.setCommitLogCompression(commitLogCompression);
 -    }
 -
 -    @Before
 -    public void setUp() throws IOException
 -    {
 -        CommitLog.instance.resetUnsafe(true);
++        DatabaseDescriptor.setEncryptionContext(encryptionContext);
+     }
+ 
+     @Parameters()
+     public static Collection<Object[]> generateData()
+     {
 -        return Arrays.asList(new Object[][] {
 -                { null }, // No compression
 -                { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()) },
 -                { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()) },
 -                { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()) } });
++        return Arrays.asList(new Object[][]{
++            {null, EncryptionContextGenerator.createDisabledContext()}, // No compression, no encryption
++            {null, EncryptionContextGenerator.createContext(true)}, // Encryption
++            {new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
++            {new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
++            {new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()}});
+     }
  
      @BeforeClass
      public static void defineSchema() throws ConfigurationException
@@@ -83,13 -113,6 +108,12 @@@
          CompactionManager.instance.disableAutoCompaction();
      }
  
 +    @Before
 +    public void setup() throws IOException
 +    {
-         logDirectory = DatabaseDescriptor.getCommitLogLocation() + "/unit";
-         new File(logDirectory).mkdirs();
++        CommitLog.instance.resetUnsafe(true);
 +    }
 +
      @Test
      public void testRecoveryWithEmptyLog() throws Exception
      {
@@@ -302,17 -330,25 +322,16 @@@
          CommitLog.instance.add(rm);
      }
  
 -    @Test
 +    @Test(expected = IllegalArgumentException.class)
      public void testExceedRecordLimit() throws Exception
      {
--        CommitLog.instance.resetUnsafe(true);
          ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
 -        try
 -        {
 -            Mutation rm = new RowUpdateBuilder(cfs.metadata, 0, "k")
 -                          .clustering("bytes")
 -                          .add("val", ByteBuffer.allocate(1 + getMaxRecordDataSize()))
 -                          .build();
 -            CommitLog.instance.add(rm);
 -            throw new AssertionError("mutation larger than limit was accepted");
 -        }
 -        catch (IllegalArgumentException e)
 -        {
 -            // IAE is thrown on too-large mutations
 -        }
 +        Mutation rm = new RowUpdateBuilder(cfs.metadata, 0, "k")
 +                      .clustering("bytes")
 +                      .add("val", ByteBuffer.allocate(1 + getMaxRecordDataSize()))
 +                      .build();
 +        CommitLog.instance.add(rm);
 +        throw new AssertionError("mutation larger than limit was accepted");
      }
  
      protected void testRecoveryWithBadSizeArgument(int size, int dataSize) throws Exception
@@@ -333,49 -369,10 +352,50 @@@
          testRecovery(out.toByteArray(), CommitLogReplayException.class);
      }
  
 +    /**
 +     * Create a temporary commit log file with an appropriate descriptor at the head.
 +     *
 +     * @return the commit log file reference and the first position after the descriptor in the file
 +     * (so that subsequent writes happen at the correct file location).
 +     */
 +    protected Pair<File, Integer> tmpFile() throws IOException
 +    {
 +        EncryptionContext encryptionContext = DatabaseDescriptor.getEncryptionContext();
 +        CommitLogDescriptor desc = new CommitLogDescriptor(CommitLogDescriptor.current_version,
 +                                                           CommitLogSegment.getNextId(),
 +                                                           DatabaseDescriptor.getCommitLogCompression(),
 +                                                           encryptionContext);
 +
-         // if we're testing encryption, we need to write out a cipher IV to the descriptor headers
-         Map<String, String> additionalHeaders = new HashMap<>();
-         if (encryptionContext.isEnabled())
-         {
-             byte[] buf = new byte[16];
-             new Random().nextBytes(buf);
-             additionalHeaders.put(EncryptionContext.ENCRYPTION_IV, Hex.bytesToHex(buf));
-         }
 +
 +        ByteBuffer buf = ByteBuffer.allocate(1024);
-         CommitLogDescriptor.writeHeader(buf, desc, additionalHeaders);
++        CommitLogDescriptor.writeHeader(buf, desc, getAdditionalHeaders(encryptionContext));
 +        buf.flip();
 +        int positionAfterHeader = buf.limit() + 1;
 +
-         File logFile = new File(logDirectory, desc.fileName());
-         logFile.deleteOnExit();
++        File logFile = new File(DatabaseDescriptor.getCommitLogLocation(), desc.fileName());
 +
 +        try (OutputStream lout = new FileOutputStream(logFile))
 +        {
 +            lout.write(buf.array(), 0, buf.limit());
 +        }
 +
 +        return Pair.create(logFile, positionAfterHeader);
 +    }
 +
++    private Map<String, String> getAdditionalHeaders(EncryptionContext encryptionContext)
++    {
++        if (!encryptionContext.isEnabled())
++            return Collections.emptyMap();
++
++        // if we're testing encryption, we need to write out a cipher IV to the descriptor headers
++        byte[] buf = new byte[16];
++        new Random().nextBytes(buf);
++        return Collections.singletonMap(EncryptionContext.ENCRYPTION_IV, Hex.bytesToHex(buf));
++    }
++
      protected File tmpFile(int version) throws IOException
      {
          File logFile = File.createTempFile("CommitLog-" + version + "-", ".log");
--        logFile.deleteOnExit();
          assert logFile.length() == 0;
          return logFile;
      }
@@@ -397,9 -394,9 +417,9 @@@
          File logFile = tmpFile(desc.version);
          CommitLogDescriptor fromFile = CommitLogDescriptor.fromFileName(logFile.getName());
          // Change id to match file.
 -        desc = new CommitLogDescriptor(desc.version, fromFile.id, desc.compression);
 +        desc = new CommitLogDescriptor(desc.version, fromFile.id, desc.compression, desc.getEncryptionContext());
          ByteBuffer buf = ByteBuffer.allocate(1024);
--        CommitLogDescriptor.writeHeader(buf, desc);
++        CommitLogDescriptor.writeHeader(buf, desc, getAdditionalHeaders(desc.getEncryptionContext()));
          try (OutputStream lout = new FileOutputStream(logFile))
          {
              lout.write(buf.array(), 0, buf.position());
@@@ -440,11 -437,11 +460,8 @@@
  
      protected void runExpecting(Callable<Void> r, Class<?> expected)
      {
--        JVMStabilityInspector.Killer originalKiller;
--        KillerForTests killerForTests;
--
--        killerForTests = new KillerForTests();
--        originalKiller = JVMStabilityInspector.replaceKiller(killerForTests);
++        KillerForTests killerForTests = new KillerForTests();
++        JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests);
  
          Throwable caught = null;
          try
@@@ -466,21 -463,8 +483,23 @@@
  
      protected void testRecovery(final byte[] logData, Class<?> expected) throws Exception
      {
++        ParameterizedClass commitLogCompression = DatabaseDescriptor.getCommitLogCompression();
++        EncryptionContext encryptionContext = DatabaseDescriptor.getEncryptionContext();
          runExpecting(() -> testRecovery(logData, CommitLogDescriptor.VERSION_20), expected);
-         runExpecting(() -> testRecovery(new CommitLogDescriptor(4, null, EncryptionContextGenerator.createDisabledContext()), logData), expected);
 -        runExpecting(() -> testRecovery(new CommitLogDescriptor(4, null), logData), expected);
++        runExpecting(() -> testRecovery(new CommitLogDescriptor(4, commitLogCompression, encryptionContext), logData), expected);
 +    }
 +
 +    protected void testRecovery(byte[] logData) throws Exception
 +    {
 +        Pair<File, Integer> pair = tmpFile();
 +        try (RandomAccessFile raf = new RandomAccessFile(pair.left, "rw"))
 +        {
 +            raf.seek(pair.right);
 +            raf.write(logData);
 +            raf.close();
 +
 +            CommitLog.instance.recover(pair.left); //CASSANDRA-1119 / CASSANDRA-1179 throw on failure*/
 +        }
      }
  
      @Test
@@@ -489,7 -473,7 +508,6 @@@
          boolean originalState = DatabaseDescriptor.isAutoSnapshot();
          try
          {
--            CommitLog.instance.resetUnsafe(true);
              boolean prev = DatabaseDescriptor.isAutoSnapshot();
              DatabaseDescriptor.setAutoSnapshot(false);
              ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
@@@ -549,183 -532,5 +566,103 @@@
              DatabaseDescriptor.setAutoSnapshot(originalState);
          }
      }
 +
 +    @Test
-     public void replay_StandardMmapped() throws IOException
-     {
-         ParameterizedClass originalCompression = DatabaseDescriptor.getCommitLogCompression();
-         EncryptionContext originalEncryptionContext = DatabaseDescriptor.getEncryptionContext();
-         try
-         {
-             DatabaseDescriptor.setCommitLogCompression(null);
-             DatabaseDescriptor.setEncryptionContext(EncryptionContextGenerator.createDisabledContext());
-             CommitLog.instance.resetUnsafe(true);
-             replaySimple(CommitLog.instance);
-             replayWithDiscard(CommitLog.instance);
-         }
-         finally
-         {
-             DatabaseDescriptor.setCommitLogCompression(originalCompression);
-             DatabaseDescriptor.setEncryptionContext(originalEncryptionContext);
-             CommitLog.instance.resetUnsafe(true);
-         }
-     }
- 
-     @Test
-     public void replay_Compressed_LZ4() throws IOException
-     {
-         replay_Compressed(new ParameterizedClass(LZ4Compressor.class.getName(), Collections.<String, String>emptyMap()));
-     }
- 
-     @Test
-     public void replay_Compressed_Snappy() throws IOException
-     {
-         replay_Compressed(new ParameterizedClass(SnappyCompressor.class.getName(), Collections.<String, String>emptyMap()));
-     }
- 
-     @Test
-     public void replay_Compressed_Deflate() throws IOException
-     {
-         replay_Compressed(new ParameterizedClass(DeflateCompressor.class.getName(), Collections.<String, String>emptyMap()));
-     }
- 
-     private void replay_Compressed(ParameterizedClass parameterizedClass) throws IOException
-     {
-         ParameterizedClass originalCompression = DatabaseDescriptor.getCommitLogCompression();
-         EncryptionContext originalEncryptionContext = DatabaseDescriptor.getEncryptionContext();
-         try
-         {
-             DatabaseDescriptor.setCommitLogCompression(parameterizedClass);
-             DatabaseDescriptor.setEncryptionContext(EncryptionContextGenerator.createDisabledContext());
-             CommitLog.instance.resetUnsafe(true);
- 
-             replaySimple(CommitLog.instance);
-             replayWithDiscard(CommitLog.instance);
-         }
-         finally
-         {
-             DatabaseDescriptor.setCommitLogCompression(originalCompression);
-             DatabaseDescriptor.setEncryptionContext(originalEncryptionContext);
-             CommitLog.instance.resetUnsafe(true);
-         }
-     }
- 
-     @Test
-     public void replay_Encrypted() throws IOException
-     {
-         ParameterizedClass originalCompression = DatabaseDescriptor.getCommitLogCompression();
-         EncryptionContext originalEncryptionContext = DatabaseDescriptor.getEncryptionContext();
-         try
-         {
-             DatabaseDescriptor.setCommitLogCompression(null);
-             DatabaseDescriptor.setEncryptionContext(EncryptionContextGenerator.createContext(true));
-             CommitLog.instance.resetUnsafe(true);
- 
-             replaySimple(CommitLog.instance);
-             replayWithDiscard(CommitLog.instance);
-         }
-         finally
-         {
-             DatabaseDescriptor.setCommitLogCompression(originalCompression);
-             DatabaseDescriptor.setEncryptionContext(originalEncryptionContext);
-             CommitLog.instance.resetUnsafe(true);
-         }
-     }
- 
-     private void replaySimple(CommitLog commitLog) throws IOException
++    public void replaySimple() throws IOException
 +    {
 +        int cellCount = 0;
 +        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
 +        final Mutation rm1 = new RowUpdateBuilder(cfs.metadata, 0, "k1")
 +                             .clustering("bytes")
 +                             .add("val", bytes("this is a string"))
 +                             .build();
 +        cellCount += 1;
-         commitLog.add(rm1);
++        CommitLog.instance.add(rm1);
 +
 +        final Mutation rm2 = new RowUpdateBuilder(cfs.metadata, 0, "k2")
 +                             .clustering("bytes")
 +                             .add("val", bytes("this is a string"))
 +                             .build();
 +        cellCount += 1;
-         commitLog.add(rm2);
++        CommitLog.instance.add(rm2);
 +
-         commitLog.sync(true);
++        CommitLog.instance.sync(true);
 +
-         Replayer replayer = new Replayer(commitLog, ReplayPosition.NONE);
-         List<String> activeSegments = commitLog.getActiveSegmentNames();
++        Replayer replayer = new Replayer(CommitLog.instance, ReplayPosition.NONE);
++        List<String> activeSegments = CommitLog.instance.getActiveSegmentNames();
 +        Assert.assertFalse(activeSegments.isEmpty());
 +
-         File[] files = new File(commitLog.location).listFiles((file, name) -> activeSegments.contains(name));
++        File[] files = new File(CommitLog.instance.location).listFiles((file, name) -> activeSegments.contains(name));
 +        replayer.recover(files);
 +
 +        assertEquals(cellCount, replayer.cells);
 +    }
 +
-     private void replayWithDiscard(CommitLog commitLog) throws IOException
++    @Test
++    public void replayWithDiscard() throws IOException
 +    {
 +        int cellCount = 0;
 +        int max = 1024;
 +        int discardPosition = (int)(max * .8); // an arbitrary number of entries that we'll skip on the replay
 +        ReplayPosition replayPosition = null;
 +        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
 +
 +        for (int i = 0; i < max; i++)
 +        {
 +            final Mutation rm1 = new RowUpdateBuilder(cfs.metadata, 0, "k" + 1)
 +                                 .clustering("bytes")
 +                                 .add("val", bytes("this is a string"))
 +                                 .build();
-             ReplayPosition position = commitLog.add(rm1);
++            ReplayPosition position = CommitLog.instance.add(rm1);
 +
 +            if (i == discardPosition)
 +                replayPosition = position;
 +            if (i > discardPosition)
 +            {
 +                cellCount += 1;
 +            }
 +        }
 +
-         commitLog.sync(true);
++        CommitLog.instance.sync(true);
 +
-         Replayer replayer = new Replayer(commitLog, replayPosition);
-         List<String> activeSegments = commitLog.getActiveSegmentNames();
++        Replayer replayer = new Replayer(CommitLog.instance, replayPosition);
++        List<String> activeSegments = CommitLog.instance.getActiveSegmentNames();
 +        Assert.assertFalse(activeSegments.isEmpty());
 +
-         File[] files = new File(commitLog.location).listFiles((file, name) -> activeSegments.contains(name));
++        File[] files = new File(CommitLog.instance.location).listFiles((file, name) -> activeSegments.contains(name));
 +        replayer.recover(files);
 +
 +        assertEquals(cellCount, replayer.cells);
 +    }
 +
 +    class Replayer extends CommitLogReplayer
 +    {
 +        private final ReplayPosition filterPosition;
 +        int cells;
 +        int skipped;
 +
 +        Replayer(CommitLog commitLog, ReplayPosition filterPosition)
 +        {
 +            super(commitLog, filterPosition, Collections.emptyMap(), ReplayFilter.create());
 +            this.filterPosition = filterPosition;
 +        }
 +
 +        @SuppressWarnings("resource")
 +        void replayMutation(byte[] inputBuffer, int size, final int entryLocation, final CommitLogDescriptor desc) throws IOException
 +        {
 +            if (entryLocation <= filterPosition.position)
 +            {
 +                // Skip over this mutation.
 +                skipped++;
 +                return;
 +            }
 +
 +            FastByteArrayInputStream bufIn = new FastByteArrayInputStream(inputBuffer, 0, size);
 +            Mutation mutation = Mutation.serializer.deserialize(new DataInputPlus.DataInputStreamPlus(bufIn), desc.getMessagingVersion(), SerializationHelper.Flag.LOCAL);
 +            for (PartitionUpdate partitionUpdate : mutation.getPartitionUpdates())
 +                for (Row row : partitionUpdate)
 +                    cells += Iterables.size(row.cells());
 +        }
 +    }
  }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java
index 69764e6,3538bd1..c8a6033
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java
@@@ -100,12 -97,11 +100,12 @@@ public class CommitLogUpgradeTestMake
      public void makeLog() throws IOException, InterruptedException
      {
          CommitLog commitLog = CommitLog.instance;
 -        System.out.format("\nUsing commit log size %dmb, compressor %s, sync %s%s\n",
 +        System.out.format("\nUsing commit log size: %dmb, compressor: %s, encryption: %s, sync: %s, %s\n",
                            mb(DatabaseDescriptor.getCommitLogSegmentSize()),
-                           commitLog.compressor != null ? commitLog.compressor.getClass().getSimpleName() : "none",
-                           commitLog.encryptionContext.isEnabled() ? "enabled" : "none",
+                           commitLog.configuration.getCompressorName(),
++                          commitLog.configuration.useEncryption(),
                            commitLog.executor.getClass().getSimpleName(),
 -                          randomSize ? " random size" : "");
 +                          randomSize ? "random size" : "");
          final List<CommitlogExecutor> threads = new ArrayList<>();
          ScheduledExecutorService scheduled = startThreads(commitLog, threads);
  


Mime
View raw message