cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ble...@apache.org
Subject [3/4] cassandra git commit: Add backpressure to compressed or encrypted commit log
Date Wed, 16 Mar 2016 17:27:26 GMT
Add backpressure to compressed or encrypted commit log

patch by Ariel Weisberg; reviewed by Benjamin Lerer for CASSANDRA-10971


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ee40e3b4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ee40e3b4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ee40e3b4

Branch: refs/heads/trunk
Commit: ee40e3b4529aa77d4d83fc3e7073902402cb3753
Parents: 4651ac7
Author: Ariel Weisberg <ariel.weisberg@datastax.com>
Authored: Wed Mar 16 18:20:29 2016 +0100
Committer: Benjamin Lerer <b.lerer@gmail.com>
Committed: Wed Mar 16 18:20:29 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../db/commitlog/AbstractCommitLogService.java  |   1 +
 .../db/commitlog/CommitLogSegment.java          |  23 +++--
 .../db/commitlog/CommitLogSegmentManager.java   |  11 +-
 .../db/commitlog/CompressedSegment.java         |   4 +-
 .../db/commitlog/EncryptedSegment.java          |   4 +-
 .../db/commitlog/FileDirectSegment.java         |  42 ++++++--
 .../commitlog/CommitLogSegmentManagerTest.java  | 100 ++++++++++++++++++
 .../cassandra/db/commitlog/CommitLogTest.java   | 101 ++++++++++---------
 9 files changed, 220 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee40e3b4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1ff4e6d..ffd0b24 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 3.5
 Merged from 3.0:
+ * Add backpressure to compressed or encrypted commit log (CASSANDRA-10971)
  * SSTableExport supports secondary index tables (CASSANDRA-11330)
  * Fix sstabledump to include missing info in debug output (CASSANDRA-11321)
  * Establish and implement canonical bulk reading workload(s) (CASSANDRA-10331)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee40e3b4/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
index 557bf50..113d1ba 100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@ -89,6 +89,7 @@ public abstract class AbstractCommitLogService
 
                         // sync and signal
                         long syncStarted = System.currentTimeMillis();
+                        //This is a target for Byteman in CommitLogSegmentManagerTest
                         commitLog.sync(shutdown);
                         lastSyncedAt = syncStarted;
                         syncComplete.signalAll();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee40e3b4/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index 5e99a07..f2d8f92 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@ -119,15 +119,26 @@ public abstract class CommitLogSegment
     final CommitLog commitLog;
     public final CommitLogDescriptor descriptor;
 
-    static CommitLogSegment createSegment(CommitLog commitLog)
+    static CommitLogSegment createSegment(CommitLog commitLog, Runnable onClose)
     {
-        CommitLogSegment segment = commitLog.encryptionContext.isEnabled() ? new EncryptedSegment(commitLog,
commitLog.encryptionContext) :
-               commitLog.compressor != null ? new CompressedSegment(commitLog) :
-               new MemoryMappedSegment(commitLog);
+        CommitLogSegment segment = commitLog.encryptionContext.isEnabled() ? new EncryptedSegment(commitLog,
commitLog.encryptionContext, onClose) :
+               commitLog.compressor != null ? new CompressedSegment(commitLog, onClose) :
+                                              new MemoryMappedSegment(commitLog);
         segment.writeLogHeader();
         return segment;
     }
 
+    /**
+     * Checks if the segments use a buffer pool.
+     *
+     * @param commitLog the commit log
+     * @return <code>true</code> if the segments use a buffer pool, <code>false</code>
otherwise.
+     */
+    static boolean usesBufferPool(CommitLog commitLog)
+    {
+        return commitLog.encryptionContext.isEnabled() || commitLog.compressor != null;
+    }
+
     static long getNextId()
     {
         return idBase + nextId.getAndIncrement();
@@ -152,7 +163,7 @@ public abstract class CommitLogSegment
         {
             throw new FSWriteError(e, logFile);
         }
-        
+
         buffer = createBuffer(commitLog);
     }
 
@@ -276,7 +287,7 @@ public abstract class CommitLogSegment
         // Note: Even if the very first allocation of this sync section failed, we still
want to enter this
         // to ensure the segment is closed. As allocatePosition is set to 1 beyond the capacity
of the buffer,
         // this will always be entered when a mutation allocation has been attempted after
the marker allocation
-        // succeeded in the previous sync. 
+        // succeeded in the previous sync.
         assert buffer != null;  // Only close once.
 
         int startMarker = lastSyncedOffset;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee40e3b4/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
index acc93c9..c4357bd 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
@@ -114,11 +114,11 @@ public class CommitLogSegmentManager
                         if (task == null)
                         {
                             // if we have no more work to do, check if we should create a
new segment
-                            if (availableSegments.isEmpty() && (activeSegments.isEmpty()
|| createReserveSegments))
+                            if (!atSegmentLimit() && availableSegments.isEmpty()
&& (activeSegments.isEmpty() || createReserveSegments))
                             {
                                 logger.trace("No segments in reserve; creating a fresh one");
                                 // TODO : some error handling in case we fail to create a
new segment
-                                availableSegments.add(CommitLogSegment.createSegment(commitLog));
+                                availableSegments.add(CommitLogSegment.createSegment(commitLog,
() -> wakeManager()));
                                 hasAvailableSegments.signalAll();
                             }
 
@@ -163,6 +163,12 @@ public class CommitLogSegmentManager
                     }
                 }
             }
+
+            private boolean atSegmentLimit()
+            {
+                return CommitLogSegment.usesBufferPool(commitLog) && CompressedSegment.hasReachedPoolLimit();
+            }
+
         };
 
         run = true;
@@ -553,5 +559,6 @@ public class CommitLogSegmentManager
     {
         return Collections.unmodifiableCollection(activeSegments);
     }
+
 }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee40e3b4/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
index 6b25ab7..573428a 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
@@ -44,9 +44,9 @@ public class CompressedSegment extends FileDirectSegment
     /**
      * Constructs a new segment file.
      */
-    CompressedSegment(CommitLog commitLog)
+    CompressedSegment(CommitLog commitLog, Runnable onClose)
     {
-        super(commitLog);
+        super(commitLog, onClose);
         this.compressor = commitLog.compressor;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee40e3b4/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java b/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
index 46969ac..731dea4 100644
--- a/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
@@ -65,9 +65,9 @@ public class EncryptedSegment extends FileDirectSegment
     private final EncryptionContext encryptionContext;
     private final Cipher cipher;
 
-    public EncryptedSegment(CommitLog commitLog, EncryptionContext encryptionContext)
+    public EncryptedSegment(CommitLog commitLog, EncryptionContext encryptionContext, Runnable
onClose)
     {
-        super(commitLog);
+        super(commitLog, onClose);
         this.encryptionContext = encryptionContext;
 
         try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee40e3b4/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java b/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java
index 75a7fc0..ec4aa91 100644
--- a/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java
@@ -21,11 +21,11 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.compress.BufferType;
-import org.apache.cassandra.io.compress.ICompressor;
 import org.apache.cassandra.io.util.FileUtils;
 
 /**
@@ -51,11 +51,19 @@ public abstract class FileDirectSegment extends CommitLogSegment
      */
     static final int MAX_BUFFERPOOL_SIZE = DatabaseDescriptor.getCommitLogMaxCompressionBuffersInPool();
 
+    /**
+     * The number of buffers in use
+     */
+    private static AtomicInteger usedBuffers = new AtomicInteger(0);
+
     volatile long lastWrittenPos = 0;
 
-    FileDirectSegment(CommitLog commitLog)
+    private final Runnable onClose;
+
+    FileDirectSegment(CommitLog commitLog, Runnable onClose)
     {
         super(commitLog);
+        this.onClose = onClose;
     }
 
     void writeLogHeader()
@@ -74,6 +82,7 @@ public abstract class FileDirectSegment extends CommitLogSegment
 
     ByteBuffer createBuffer(BufferType bufferType)
     {
+        usedBuffers.incrementAndGet();
         ByteBuffer buf = bufferPool.poll();
         if (buf != null)
         {
@@ -87,16 +96,35 @@ public abstract class FileDirectSegment extends CommitLogSegment
     @Override
     protected void internalClose()
     {
-        if (bufferPool.size() < MAX_BUFFERPOOL_SIZE)
-            bufferPool.add(buffer);
-        else
-            FileUtils.clean(buffer);
+        usedBuffers.decrementAndGet();
 
-        super.internalClose();
+        try
+        {
+            if (bufferPool.size() < MAX_BUFFERPOOL_SIZE)
+                bufferPool.add(buffer);
+            else
+                FileUtils.clean(buffer);
+            super.internalClose();
+        }
+        finally
+        {
+            onClose.run();
+        }
     }
 
     static void shutdown()
     {
         bufferPool.clear();
     }
+
+    /**
+     * Checks if the number of buffers in use is greater or equals to the maximum number
of buffers allowed in the pool.
+     *
+     * @return <code>true</code> if the number of buffers in use is greater or
equals to the maximum number of buffers
+     * allowed in the pool, <code>false</code> otherwise.
+     */
+    static boolean hasReachedPoolLimit()
+    {
+        return usedBuffers.get() >= MAX_BUFFERPOOL_SIZE;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee40e3b4/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java
b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java
new file mode 100644
index 0000000..b5c2f41
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java
@@ -0,0 +1,100 @@
+package org.apache.cassandra.db.commitlog;
+
+import java.nio.ByteBuffer;
+import java.util.Random;
+import java.util.concurrent.Semaphore;
+
+import javax.naming.ConfigurationException;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.Config.CommitLogSync;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.ParameterizedClass;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.marshal.AsciiType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import com.google.common.collect.ImmutableMap;
+
+@RunWith(BMUnitRunner.class)
+public class CommitLogSegmentManagerTest
+{
+    //Block commit log service from syncing
+    private static final Semaphore allowSync = new Semaphore(0);
+
+    private static final String KEYSPACE1 = "CommitLogTest";
+    private static final String STANDARD1 = "Standard1";
+    private static final String STANDARD2 = "Standard2";
+
+    private final static byte[] entropy = new byte[1024 * 256];
+    @BeforeClass
+    public static void defineSchema() throws ConfigurationException
+    {
+        new Random().nextBytes(entropy);
+        DatabaseDescriptor.setCommitLogCompression(new ParameterizedClass("LZ4Compressor",
ImmutableMap.of()));
+        DatabaseDescriptor.setCommitLogSegmentSize(1);
+        DatabaseDescriptor.setCommitLogSync(CommitLogSync.periodic);
+        DatabaseDescriptor.setCommitLogSyncPeriod(10 * 1000);
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE1,
+                                    KeyspaceParams.simple(1),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, STANDARD1, 0, AsciiType.instance,
BytesType.instance),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, STANDARD2, 0, AsciiType.instance,
BytesType.instance));
+
+        CompactionManager.instance.disableAutoCompaction();
+    }
+
+    @Test
+    @BMRule(name = "Block AbstractCommitLogSegment segment flushing",
+            targetClass = "AbstractCommitLogService$1",
+            targetMethod = "run",
+            targetLocation = "AT INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync",
+            action = "org.apache.cassandra.db.commitlog.CommitLogSegmentManagerTest.allowSync.acquire()")
+    public void testCompressedCommitLogBackpressure() throws Throwable
+    {
+        CommitLog.instance.resetUnsafe(true);
+        ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+
+        final Mutation m = new RowUpdateBuilder(cfs1.metadata, 0, "k")
+                     .clustering("bytes")
+                     .add("val", ByteBuffer.wrap(entropy))
+                     .build();
+
+        Thread dummyThread = new Thread( () ->
+        {
+            for (int i = 0; i < 20; i++)
+                CommitLog.instance.add(m);
+        });
+        dummyThread.start();
+
+        CommitLogSegmentManager clsm = CommitLog.instance.allocator;
+
+        //Protect against delay, but still break out as fast as possible
+        long start = System.currentTimeMillis();
+        while (System.currentTimeMillis() - start < 5000)
+        {
+            if (clsm.getActiveSegments().size() >= 3)
+                break;
+        }
+        Thread.sleep(1000);
+
+        //Should only be able to create 3 segments not 7 because it blocks waiting for truncation
that never comes
+        Assert.assertEquals(3, clsm.getActiveSegments().size());
+
+        clsm.getActiveSegments().forEach( segment -> clsm.recycleSegment(segment));
+
+        Util.spinAssertEquals(3, () -> clsm.getActiveSegments().size(), 5);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee40e3b4/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index 91a25e1..b5cbf8b 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@ -18,20 +18,9 @@
 */
 package org.apache.cassandra.db.commitlog;
 
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.RandomAccessFile;
+import java.io.*;
 import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.UUID;
+import java.util.*;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.zip.CRC32;
@@ -39,19 +28,17 @@ import java.util.zip.Checksum;
 
 import com.google.common.collect.Iterables;
 
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.RowUpdateBuilder;
-import org.apache.cassandra.db.marshal.AsciiType;
-import org.apache.cassandra.db.marshal.BytesType;
+import org.junit.*;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.ParameterizedClass;
+import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.commitlog.CommitLogReplayer.CommitLogReplayException;
 import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.marshal.AsciiType;
+import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.db.rows.SerializationHelper;
@@ -61,22 +48,13 @@ import org.apache.cassandra.io.compress.LZ4Compressor;
 import org.apache.cassandra.io.compress.SnappyCompressor;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.FastByteArrayInputStream;
-import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.schema.KeyspaceParams;
-import org.apache.cassandra.utils.JVMStabilityInspector;
-import org.apache.cassandra.utils.KillerForTests;
 import org.apache.cassandra.security.EncryptionContext;
 import org.apache.cassandra.security.EncryptionContextGenerator;
-import org.apache.cassandra.utils.Hex;
-import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.*;
 import org.apache.cassandra.utils.vint.VIntCoding;
 
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
 import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -106,7 +84,7 @@ public class CommitLogTest
     }
 
     @Before
-    public void setup()
+    public void setup() throws IOException
     {
         logDirectory = DatabaseDescriptor.getCommitLogLocation() + "/unit";
         new File(logDirectory).mkdirs();
@@ -575,11 +553,22 @@ public class CommitLogTest
     @Test
     public void replay_StandardMmapped() throws IOException
     {
-        DatabaseDescriptor.setCommitLogCompression(null);
-        DatabaseDescriptor.setEncryptionContext(EncryptionContextGenerator.createDisabledContext());
-        CommitLog commitLog = new CommitLog(logDirectory, CommitLogArchiver.disabled()).start();
-        replaySimple(commitLog);
-        replayWithDiscard(commitLog);
+        ParameterizedClass originalCompression = DatabaseDescriptor.getCommitLogCompression();
+        EncryptionContext originalEncryptionContext = DatabaseDescriptor.getEncryptionContext();
+        try
+        {
+            DatabaseDescriptor.setCommitLogCompression(null);
+            DatabaseDescriptor.setEncryptionContext(EncryptionContextGenerator.createDisabledContext());
+            CommitLog.instance.resetUnsafe(true);
+            replaySimple(CommitLog.instance);
+            replayWithDiscard(CommitLog.instance);
+        }
+        finally
+        {
+            DatabaseDescriptor.setCommitLogCompression(originalCompression);
+            DatabaseDescriptor.setEncryptionContext(originalEncryptionContext);
+            CommitLog.instance.resetUnsafe(true);
+        }
     }
 
     @Test
@@ -602,29 +591,44 @@ public class CommitLogTest
 
     private void replay_Compressed(ParameterizedClass parameterizedClass) throws IOException
     {
-        DatabaseDescriptor.setCommitLogCompression(parameterizedClass);
-        DatabaseDescriptor.setEncryptionContext(EncryptionContextGenerator.createDisabledContext());
-        CommitLog commitLog = new CommitLog(logDirectory, CommitLogArchiver.disabled()).start();
-        replaySimple(commitLog);
-        replayWithDiscard(commitLog);
+        ParameterizedClass originalCompression = DatabaseDescriptor.getCommitLogCompression();
+        EncryptionContext originalEncryptionContext = DatabaseDescriptor.getEncryptionContext();
+        try
+        {
+            DatabaseDescriptor.setCommitLogCompression(parameterizedClass);
+            DatabaseDescriptor.setEncryptionContext(EncryptionContextGenerator.createDisabledContext());
+            CommitLog.instance.resetUnsafe(true);
+
+            replaySimple(CommitLog.instance);
+            replayWithDiscard(CommitLog.instance);
+        }
+        finally
+        {
+            DatabaseDescriptor.setCommitLogCompression(originalCompression);
+            DatabaseDescriptor.setEncryptionContext(originalEncryptionContext);
+            CommitLog.instance.resetUnsafe(true);
+        }
     }
 
     @Test
     public void replay_Encrypted() throws IOException
     {
-        DatabaseDescriptor.setCommitLogCompression(null);
-        DatabaseDescriptor.setEncryptionContext(EncryptionContextGenerator.createContext(true));
-        CommitLog commitLog = new CommitLog(logDirectory, CommitLogArchiver.disabled()).start();
-
+        ParameterizedClass originalCompression = DatabaseDescriptor.getCommitLogCompression();
+        EncryptionContext originalEncryptionContext = DatabaseDescriptor.getEncryptionContext();
         try
         {
-            replaySimple(commitLog);
-            replayWithDiscard(commitLog);
+            DatabaseDescriptor.setCommitLogCompression(null);
+            DatabaseDescriptor.setEncryptionContext(EncryptionContextGenerator.createContext(true));
+            CommitLog.instance.resetUnsafe(true);
+
+            replaySimple(CommitLog.instance);
+            replayWithDiscard(CommitLog.instance);
         }
         finally
         {
-            for (String file : commitLog.getActiveSegmentNames())
-                FileUtils.delete(new File(commitLog.location, file));
+            DatabaseDescriptor.setCommitLogCompression(originalCompression);
+            DatabaseDescriptor.setEncryptionContext(originalEncryptionContext);
+            CommitLog.instance.resetUnsafe(true);
         }
     }
 
@@ -706,6 +710,7 @@ public class CommitLogTest
             this.filterPosition = filterPosition;
         }
 
+        @SuppressWarnings("resource")
         void replayMutation(byte[] inputBuffer, int size, final long entryLocation, final
CommitLogDescriptor desc) throws IOException
         {
             if (entryLocation <= filterPosition.position)


Mime
View raw message