cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bened...@apache.org
Subject [2/3] cassandra git commit: Compressed Commit Log
Date Tue, 24 Mar 2015 23:43:02 GMT
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f8254d/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
index 8e1cc17..af415a2 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
@@ -28,7 +28,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -37,6 +36,7 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.*;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,9 +45,7 @@ import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.concurrent.WaitQueue;
 import org.apache.cassandra.utils.JVMStabilityInspector;
@@ -64,10 +62,9 @@ public class CommitLogSegmentManager
     static final Logger logger = LoggerFactory.getLogger(CommitLogSegmentManager.class);
 
     /**
-     * Queue of work to be done by the manager thread.  This is usually a recycle operation, which returns
-     * a CommitLogSegment, or a delete operation, which returns null.
+     * Queue of work to be done by the manager thread, also used to wake the thread to perform segment allocation.
      */
-    private final BlockingQueue<Callable<CommitLogSegment>> segmentManagementTasks = new LinkedBlockingQueue<>();
+    private final BlockingQueue<Runnable> segmentManagementTasks = new LinkedBlockingQueue<>();
 
     /** Segments that are ready to be used. Head of the queue is the one we allocate writes to */
     private final ConcurrentLinkedQueue<CommitLogSegment> availableSegments = new ConcurrentLinkedQueue<>();
@@ -96,9 +93,11 @@ public class CommitLogSegmentManager
 
     private Thread managerThread;
     private volatile boolean run = true;
+    private final CommitLog commitLog;
 
-    public CommitLogSegmentManager()
+    public CommitLogSegmentManager(final CommitLog commitLog)
     {
+        this.commitLog = commitLog;
         start();
     }
 
@@ -113,7 +112,7 @@ public class CommitLogSegmentManager
                 {
                     try
                     {
-                        Callable<CommitLogSegment> task = segmentManagementTasks.poll();
+                        Runnable task = segmentManagementTasks.poll();
                         if (task == null)
                         {
                             // if we have no more work to do, check if we should create a new segment
@@ -122,7 +121,7 @@ public class CommitLogSegmentManager
                                 logger.debug("No segments in reserve; creating a fresh one");
                                 size.addAndGet(DatabaseDescriptor.getCommitLogSegmentSize());
                                 // TODO : some error handling in case we fail to create a new segment
-                                availableSegments.add(CommitLogSegment.freshSegment());
+                                availableSegments.add(CommitLogSegment.createSegment(commitLog));
                                 hasAvailableSegments.signalAll();
                             }
 
@@ -155,13 +154,7 @@ public class CommitLogSegmentManager
                             }
                         }
 
-                        CommitLogSegment recycled = task.call();
-                        if (recycled != null)
-                        {
-                            // if the work resulted in a segment to recycle, publish it
-                            availableSegments.add(recycled);
-                            hasAvailableSegments.signalAll();
-                        }
+                        task.run();
                     }
                     catch (Throwable t)
                     {
@@ -242,19 +235,19 @@ public class CommitLogSegmentManager
                 {
                     // Now we can run the user defined command just after switching to the new commit log.
                     // (Do this here instead of in the recycle call so we can get a head start on the archive.)
-                    CommitLog.instance.archiver.maybeArchive(old);
+                    commitLog.archiver.maybeArchive(old);
 
                     // ensure we don't continue to use the old file; not strictly necessary, but cleaner to enforce it
                     old.discardUnusedTail();
                 }
 
                 // request that the CL be synced out-of-band, as we've finished a segment
-                CommitLog.instance.requestExtraSync();
+                commitLog.requestExtraSync();
                 return;
             }
 
             // no more segments, so register to receive a signal when not empty
-            WaitQueue.Signal signal = hasAvailableSegments.register(CommitLog.instance.metrics.waitingOnSegmentAllocation.time());
+            WaitQueue.Signal signal = hasAvailableSegments.register(commitLog.metrics.waitingOnSegmentAllocation.time());
 
             // trigger the management thread; this must occur after registering
             // the signal to ensure we are woken by any new segment creation
@@ -283,13 +276,7 @@ public class CommitLogSegmentManager
     private void wakeManager()
     {
         // put a NO-OP on the queue, to trigger management thread (and create a new segment if necessary)
-        segmentManagementTasks.add(new Callable<CommitLogSegment>()
-        {
-            public CommitLogSegment call()
-            {
-                return null;
-            }
-        });
+        segmentManagementTasks.add(Runnables.doNothing());
     }
 
     /**
@@ -354,28 +341,10 @@ public class CommitLogSegmentManager
      */
     void recycleSegment(final CommitLogSegment segment)
     {
-        boolean archiveSuccess = CommitLog.instance.archiver.maybeWaitForArchiving(segment.getName());
+        boolean archiveSuccess = commitLog.archiver.maybeWaitForArchiving(segment.getName());
         activeSegments.remove(segment);
-        if (!archiveSuccess)
-        {
-            // if archiving (command) was not successful then leave the file alone. don't delete or recycle.
-            discardSegment(segment, false);
-            return;
-        }
-        if (isCapExceeded())
-        {
-            discardSegment(segment, true);
-            return;
-        }
-
-        logger.debug("Recycling {}", segment);
-        segmentManagementTasks.add(new Callable<CommitLogSegment>()
-        {
-            public CommitLogSegment call()
-            {
-                return segment.recycle();
-            }
-        });
+        // if archiving (command) was not successful then leave the file alone. don't delete or recycle.
+        discardSegment(segment, archiveSuccess);
     }
 
     /**
@@ -386,25 +355,9 @@ public class CommitLogSegmentManager
      */
     void recycleSegment(final File file)
     {
-        if (isCapExceeded()
-            || CommitLogDescriptor.fromFileName(file.getName()).getMessagingVersion() != MessagingService.current_version)
-        {
-            // (don't decrease managed size, since this was never a "live" segment)
-            logger.debug("(Unopened) segment {} is no longer needed and will be deleted now", file);
-            FileUtils.deleteWithConfirm(file);
-            return;
-        }
-
-        logger.debug("Recycling {}", file);
-        // this wasn't previously a live segment, so add it to the managed size when we make it live
-        size.addAndGet(DatabaseDescriptor.getCommitLogSegmentSize());
-        segmentManagementTasks.add(new Callable<CommitLogSegment>()
-        {
-            public CommitLogSegment call()
-            {
-                return new CommitLogSegment(file.getPath());
-            }
-        });
+        // (don't decrease managed size, since this was never a "live" segment)
+        logger.debug("(Unopened) segment {} is no longer needed and will be deleted now", file);
+        FileUtils.deleteWithConfirm(file);
     }
 
     /**
@@ -417,14 +370,13 @@ public class CommitLogSegmentManager
         logger.debug("Segment {} is no longer active and will be deleted {}", segment, deleteFile ? "now" : "by the archive script");
         size.addAndGet(-DatabaseDescriptor.getCommitLogSegmentSize());
 
-        segmentManagementTasks.add(new Callable<CommitLogSegment>()
+        segmentManagementTasks.add(new Runnable()
         {
-            public CommitLogSegment call()
+            public void run()
             {
                 segment.close();
                 if (deleteFile)
                     segment.delete();
-                return null;
             }
         });
     }
@@ -449,16 +401,6 @@ public class CommitLogSegmentManager
         return false;
     }
 
-    /**
-     * Check to see if the speculative current size exceeds the cap.
-     *
-     * @return true if cap is exceeded
-     */
-    private boolean isCapExceeded()
-    {
-        return unusedCapacity() < 0;
-    }
-
     private long unusedCapacity()
     {
         long total = DatabaseDescriptor.getTotalCommitlogSpaceInMB() * 1024 * 1024;
@@ -586,7 +528,7 @@ public class CommitLogSegmentManager
     public void shutdown()
     {
         run = false;
-        segmentManagementTasks.add(Callables.<CommitLogSegment>returning(null));
+        wakeManager();
     }
 
     /**
@@ -595,6 +537,14 @@ public class CommitLogSegmentManager
     public void awaitTermination() throws InterruptedException
     {
         managerThread.join();
+
+        for (CommitLogSegment segment : activeSegments)
+            segment.close();
+
+        for (CommitLogSegment segment : availableSegments)
+            segment.close();
+
+        CompressedSegment.shutdown();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f8254d/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
new file mode 100644
index 0000000..f3a80bc
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.compress.ICompressor;
+import org.apache.cassandra.io.compress.ICompressor.WrappedByteBuffer;
+import org.apache.cassandra.io.util.FileUtils;
+
+/*
+ * Compressed commit log segment. Provides an in-memory buffer for the mutation threads. On sync compresses the written
+ * section of the buffer and writes it to the destination channel.
+ */
+public class CompressedSegment extends CommitLogSegment
+{
+    static private final ThreadLocal<WrappedByteBuffer> compressedBufferHolder = new ThreadLocal<WrappedByteBuffer>() {
+        protected WrappedByteBuffer initialValue()
+        {
+            return new WrappedByteBuffer(ByteBuffer.allocate(0));
+        }
+    };
+    static Queue<ByteBuffer> bufferPool = new ConcurrentLinkedQueue<>();
+
+    /**
+     * Maximum number of buffers in the compression pool. The default value is 3, it should not be set lower than that
+     * (one segment in compression, one written to, one in reserve); delays in compression may cause the log to use
+     * more, depending on how soon the sync policy stops all writing threads.
+     */
+    static final int MAX_BUFFERPOOL_SIZE = DatabaseDescriptor.getCommitLogMaxCompressionBuffersInPool();
+
+    static final int COMPRESSED_MARKER_SIZE = SYNC_MARKER_SIZE + 4;
+    final ICompressor compressor;
+
+    /**
+     * Constructs a new segment file.
+     */
+    CompressedSegment(CommitLog commitLog)
+    {
+        super(commitLog);
+        this.compressor = commitLog.compressor;
+        try
+        {
+            channel.write((ByteBuffer) buffer.duplicate().flip());
+        }
+        catch (IOException e)
+        {
+            throw new FSWriteError(e, getPath());
+        }
+    }
+
+    static ByteBuffer allocate(ICompressor compressor, int size)
+    {
+        if (compressor.useDirectOutputByteBuffers())
+            return ByteBuffer.allocateDirect(size);
+        else
+            return ByteBuffer.allocate(size);
+    }
+    
+    ByteBuffer allocate(int size)
+    {
+        return allocate(compressor, size);
+    }
+
+    ByteBuffer createBuffer(CommitLog commitLog)
+    {
+        ByteBuffer buf = bufferPool.poll();
+        if (buf == null)
+        {
+            // this.compressor is not yet set, so we must use the commitLog's one.
+            buf = allocate(commitLog.compressor, DatabaseDescriptor.getCommitLogSegmentSize());
+        } else
+            buf.clear();
+        return buf;
+    }
+
+    static long startMillis = System.currentTimeMillis();
+
+    @Override
+    void write(int startMarker, int nextMarker)
+    {
+        int contentStart = startMarker + SYNC_MARKER_SIZE;
+        int length = nextMarker - contentStart;
+        // The length may be 0 when the segment is being closed.
+        assert length > 0 || length == 0 && !isStillAllocating();
+
+        try {
+
+            int compressedLength = compressor.initialCompressedBufferLength(length);
+            WrappedByteBuffer wrappedCompressedBuffer = compressedBufferHolder.get();
+            ByteBuffer compressedBuffer = wrappedCompressedBuffer.buffer;
+            if (compressedBuffer.isDirect() != compressor.useDirectOutputByteBuffers() ||
+                compressedBuffer.capacity() < compressedLength + COMPRESSED_MARKER_SIZE)
+            {
+                compressedBuffer = allocate(compressedLength + COMPRESSED_MARKER_SIZE);
+                FileUtils.clean(wrappedCompressedBuffer.buffer);
+                wrappedCompressedBuffer.buffer = compressedBuffer;
+            }
+
+            ByteBuffer inputBuffer = buffer.duplicate();
+            inputBuffer.limit(contentStart + length).position(contentStart);
+            compressedBuffer.limit(compressedBuffer.capacity()).position(COMPRESSED_MARKER_SIZE);
+            compressedLength = compressor.compress(inputBuffer, wrappedCompressedBuffer);
+
+            compressedBuffer.position(0);
+            compressedBuffer.limit(COMPRESSED_MARKER_SIZE + compressedLength);
+            compressedBuffer.putInt(SYNC_MARKER_SIZE, length);
+
+            // Only one thread can be here at a given time.
+            // Protected by synchronization on CommitLogSegment.sync().
+            writeSyncMarker(compressedBuffer, 0, (int) channel.position(), (int) channel.position() + compressedBuffer.remaining());
+            channel.write(compressedBuffer);
+            channel.force(true);
+        }
+        catch (Exception e)
+        {
+            throw new FSWriteError(e, getPath());
+        }
+    }
+
+    @Override
+    protected void internalClose()
+    {
+        if (bufferPool.size() < MAX_BUFFERPOOL_SIZE)
+            bufferPool.add(buffer);
+        else
+            FileUtils.clean(buffer);
+
+        super.internalClose();
+    }
+
+    static void shutdown()
+    {
+        bufferPool.clear();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f8254d/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java b/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
new file mode 100644
index 0000000..98b9abb
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.CLibrary;
+
+/*
+ * Memory-mapped segment. Maps the destination channel into an appropriately-sized memory-mapped buffer in which the
+ * mutation threads write. On sync forces the buffer to disk.
+ * If possible, recycles used segment files to avoid reallocating large chunks of disk.
+ */
+public class MemoryMappedSegment extends CommitLogSegment
+{
+    /**
+     * Constructs a new segment file.
+     *
+     * @param filePath  if not null, recycles the existing file by renaming it and truncating it to CommitLog.SEGMENT_SIZE.
+     * @param commitLog the commit log it will be used with.
+     */
+    MemoryMappedSegment(CommitLog commitLog)
+    {
+        super(commitLog);
+        // mark the initial sync marker as uninitialised
+        int firstSync = buffer.position();
+        buffer.putInt(firstSync + 0, 0);
+        buffer.putInt(firstSync + 4, 0);
+    }
+
+    ByteBuffer createBuffer(CommitLog commitLog)
+    {
+        try
+        {
+            // Extend the file size to the standard segment size.
+            // NOTE: while we're using RAF to easily adjust file size, we need to avoid using RAF
+            // for grabbing the FileChannel due to FILE_SHARE_DELETE flag bug on windows.
+            // See: https://bugs.openjdk.java.net/browse/JDK-6357433 and CASSANDRA-8308
+            try (RandomAccessFile raf = new RandomAccessFile(logFile, "rw"))
+            {
+                raf.setLength(DatabaseDescriptor.getCommitLogSegmentSize());
+            }
+            catch (IOException e)
+            {
+                throw new FSWriteError(e, logFile);
+            }
+
+            return channel.map(FileChannel.MapMode.READ_WRITE, 0, DatabaseDescriptor.getCommitLogSegmentSize());
+        }
+        catch (IOException e)
+        {
+            throw new FSWriteError(e, logFile);
+        }
+    }
+
+    @Override
+    void write(int startMarker, int nextMarker)
+    {
+        // if there's room in the discard section to write an empty header,
+        // zero out the next sync marker so replayer can cleanly exit
+        if (nextMarker <= buffer.capacity() - SYNC_MARKER_SIZE)
+        {
+            buffer.putInt(nextMarker, 0);
+            buffer.putInt(nextMarker + 4, 0);
+        }
+
+        // write previous sync marker to point to next sync marker
+        // we don't chain the crcs here to ensure this method is idempotent if it fails
+        writeSyncMarker(buffer, startMarker, startMarker, nextMarker);
+
+        try {
+            ((MappedByteBuffer) buffer).force();
+        }
+        catch (Exception e) // MappedByteBuffer.force() does not declare IOException but can actually throw it
+        {
+            throw new FSWriteError(e, getPath());
+        }
+        CLibrary.trySkipCache(fd, startMarker, nextMarker);
+    }
+
+    @Override
+    protected void internalClose()
+    {
+        if (FileUtils.isCleanerAvailable())
+            FileUtils.clean(buffer);
+        super.internalClose();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f8254d/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
index 14bb367..86a248b 100644
--- a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
@@ -22,7 +22,6 @@ import org.apache.cassandra.utils.concurrent.WaitQueue;
 
 class PeriodicCommitLogService extends AbstractCommitLogService
 {
-
     private static final int blockWhenSyncLagsMillis = (int) (DatabaseDescriptor.getCommitLogSyncPeriod() * 1.5);
 
     public PeriodicCommitLogService(final CommitLog commitLog)
@@ -39,7 +38,7 @@ class PeriodicCommitLogService extends AbstractCommitLogService
             pending.incrementAndGet();
             while (waitForSyncToCatchUp(started))
             {
-                WaitQueue.Signal signal = syncComplete.register(CommitLog.instance.metrics.waitingOnCommit.time());
+                WaitQueue.Signal signal = syncComplete.register(commitLog.metrics.waitingOnCommit.time());
                 if (waitForSyncToCatchUp(started))
                     signal.awaitUninterruptibly();
                 else

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f8254d/src/java/org/apache/cassandra/io/compress/CompressionParameters.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionParameters.java b/src/java/org/apache/cassandra/io/compress/CompressionParameters.java
index 88ca396..d5db130 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionParameters.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionParameters.java
@@ -21,20 +21,21 @@ import java.io.DataInput;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
-import java.util.Collections;
 import java.util.AbstractSet;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
-import org.apache.cassandra.config.CFMetaData;
+
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
-
-import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ParametrizedClass;
 import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataOutputPlus;
 
@@ -144,7 +145,7 @@ public class CompressionParameters
         return chunkLength == null ? DEFAULT_CHUNK_LENGTH : chunkLength;
     }
 
-    private static Class<? extends ICompressor> parseCompressorClass(String className) throws ConfigurationException
+    private static Class<?> parseCompressorClass(String className) throws ConfigurationException
     {
         if (className == null || className.isEmpty())
             return null;
@@ -152,7 +153,7 @@ public class CompressionParameters
         className = className.contains(".") ? className : "org.apache.cassandra.io.compress." + className;
         try
         {
-            return (Class<? extends ICompressor>)Class.forName(className);
+            return Class.forName(className);
         }
         catch (Exception e)
         {
@@ -160,7 +161,7 @@ public class CompressionParameters
         }
     }
 
-    private static ICompressor createCompressor(Class<? extends ICompressor> compressorClass, Map<String, String> compressionOptions) throws ConfigurationException
+    private static ICompressor createCompressor(Class<?> compressorClass, Map<String, String> compressionOptions) throws ConfigurationException
     {
         if (compressorClass == null)
         {
@@ -206,6 +207,10 @@ public class CompressionParameters
         }
     }
 
+    public static ICompressor createCompressor(ParametrizedClass compression) throws ConfigurationException {
+        return createCompressor(parseCompressorClass(compression.class_name), copyOptions(compression.parameters));
+    }
+
     private static Map<String, String> copyOptions(Map<? extends CharSequence, ? extends CharSequence> co)
     {
         if (co == null || co.isEmpty())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f8254d/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java b/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
index bede4da..a88e4d2 100644
--- a/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
+++ b/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
@@ -77,7 +77,7 @@ public class DeflateCompressor implements ICompressor
 
         Deflater def = deflater.get();
         def.reset();
-        def.setInput(src.array(), src.position(), src.limit());
+        def.setInput(src.array(), src.arrayOffset() + src.position(), src.remaining());
         def.finish();
         if (def.needsInput())
             return 0;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f8254d/src/java/org/apache/cassandra/io/util/ByteBufferDataInput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ByteBufferDataInput.java b/src/java/org/apache/cassandra/io/util/ByteBufferDataInput.java
new file mode 100644
index 0000000..2d36d54
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/ByteBufferDataInput.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.util;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class ByteBufferDataInput extends AbstractDataInput implements FileDataInput, DataInput
+{
+    private final ByteBuffer buffer;
+    private final String filename;
+    private final long segmentOffset;
+    private int position;
+
+    public ByteBufferDataInput(ByteBuffer buffer, String filename, long segmentOffset, int position)
+    {
+        assert buffer != null;
+        this.buffer = buffer;
+        this.filename = filename;
+        this.segmentOffset = segmentOffset;
+        this.position = position;
+    }
+
+    // Only use when we know the seek in within the mapped segment. Throws an
+    // IOException otherwise.
+    public void seek(long pos) throws IOException
+    {
+        long inSegmentPos = pos - segmentOffset;
+        if (inSegmentPos < 0 || inSegmentPos > buffer.capacity())
+            throw new IOException(String.format("Seek position %d is not within mmap segment (seg offs: %d, length: %d)", pos, segmentOffset, buffer.capacity()));
+
+        position = (int) inSegmentPos;
+    }
+
+    public long getFilePointer()
+    {
+        return segmentOffset + position;
+    }
+
+    public long getPosition()
+    {
+        return segmentOffset + position;
+    }
+
+    public long getPositionLimit()
+    {
+        return segmentOffset + buffer.capacity();
+    }
+
+    @Override
+    public boolean markSupported()
+    {
+        return false;
+    }
+
+    public void reset(FileMark mark) throws IOException
+    {
+        assert mark instanceof MappedFileDataInputMark;
+        position = ((MappedFileDataInputMark) mark).position;
+    }
+
+    public FileMark mark()
+    {
+        return new MappedFileDataInputMark(position);
+    }
+
+    public long bytesPastMark(FileMark mark)
+    {
+        assert mark instanceof MappedFileDataInputMark;
+        assert position >= ((MappedFileDataInputMark) mark).position;
+        return position - ((MappedFileDataInputMark) mark).position;
+    }
+
+    public boolean isEOF() throws IOException
+    {
+        return position == buffer.capacity();
+    }
+
+    public long bytesRemaining() throws IOException
+    {
+        return buffer.capacity() - position;
+    }
+
+    public String getPath()
+    {
+        return filename;
+    }
+
+    public int read() throws IOException
+    {
+        if (isEOF())
+            return -1;
+        return buffer.get(position++) & 0xFF;
+    }
+
+    /**
+     * Does the same thing as <code>readFully</code> do but without copying data (thread safe)
+     * @param length length of the bytes to read
+     * @return buffer with portion of file content
+     * @throws IOException on any fail of I/O operation
+     */
+    public ByteBuffer readBytes(int length) throws IOException
+    {
+        int remaining = buffer.remaining() - position;
+        if (length > remaining)
+            throw new IOException(String.format("mmap segment underflow; remaining is %d but %d requested",
+                                                remaining, length));
+
+        if (length == 0)
+            return ByteBufferUtil.EMPTY_BYTE_BUFFER;
+
+        ByteBuffer bytes = buffer.duplicate();
+        bytes.position(buffer.position() + position).limit(buffer.position() + position + length);
+        position += length;
+
+        // we have to copy the data in case we unreference the underlying sstable.  See CASSANDRA-3179
+        ByteBuffer clone = ByteBuffer.allocate(bytes.remaining());
+        clone.put(bytes);
+        clone.flip();
+        return clone;
+    }
+
+    @Override
+    public final void readFully(byte[] bytes) throws IOException
+    {
+        ByteBufferUtil.arrayCopy(buffer, buffer.position() + position, bytes, 0, bytes.length);
+        position += bytes.length;
+    }
+
+    @Override
+    public final void readFully(byte[] bytes, int offset, int count) throws IOException
+    {
+        ByteBufferUtil.arrayCopy(buffer, buffer.position() + position, bytes, offset, count);
+        position += count;
+    }
+
+    private static class MappedFileDataInputMark implements FileMark
+    {
+        int position;
+
+        MappedFileDataInputMark(int position)
+        {
+            this.position = position;
+        }
+    }
+
+    @Override
+    public String toString() {
+        return getClass().getSimpleName() + "(" +
+               "filename='" + filename + "'" +
+               ", position=" + position +
+               ")";
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f8254d/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java b/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
deleted file mode 100644
index 82ea6d2..0000000
--- a/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io.util;
-
-import java.io.*;
-import java.nio.ByteBuffer;
-import java.nio.MappedByteBuffer;
-import java.nio.channels.FileChannel;
-
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-public class MappedFileDataInput extends AbstractDataInput implements FileDataInput, DataInput
-{
-    private final MappedByteBuffer buffer;
-    private final String filename;
-    private final long segmentOffset;
-    private int position;
-
-    public MappedFileDataInput(MappedByteBuffer buffer, String filename, long segmentOffset, int position)
-    {
-        assert buffer != null;
-        this.buffer = buffer;
-        this.filename = filename;
-        this.segmentOffset = segmentOffset;
-        this.position = position;
-    }
-
-    // Only use when we know the seek in within the mapped segment. Throws an
-    // IOException otherwise.
-    public void seek(long pos) throws IOException
-    {
-        long inSegmentPos = pos - segmentOffset;
-        if (inSegmentPos < 0 || inSegmentPos > buffer.capacity())
-            throw new IOException(String.format("Seek position %d is not within mmap segment (seg offs: %d, length: %d)", pos, segmentOffset, buffer.capacity()));
-
-        position = (int) inSegmentPos;
-    }
-
-    public long getFilePointer()
-    {
-        return segmentOffset + position;
-    }
-
-    public long getPosition()
-    {
-        return segmentOffset + position;
-    }
-
-    public long getPositionLimit()
-    {
-        return segmentOffset + buffer.capacity();
-    }
-
-    @Override
-    public boolean markSupported()
-    {
-        return false;
-    }
-
-    public void reset(FileMark mark) throws IOException
-    {
-        assert mark instanceof MappedFileDataInputMark;
-        position = ((MappedFileDataInputMark) mark).position;
-    }
-
-    public FileMark mark()
-    {
-        return new MappedFileDataInputMark(position);
-    }
-
-    public long bytesPastMark(FileMark mark)
-    {
-        assert mark instanceof MappedFileDataInputMark;
-        assert position >= ((MappedFileDataInputMark) mark).position;
-        return position - ((MappedFileDataInputMark) mark).position;
-    }
-
-    public boolean isEOF() throws IOException
-    {
-        return position == buffer.capacity();
-    }
-
-    public long bytesRemaining() throws IOException
-    {
-        return buffer.capacity() - position;
-    }
-
-    public String getPath()
-    {
-        return filename;
-    }
-
-    public int read() throws IOException
-    {
-        if (isEOF())
-            return -1;
-        return buffer.get(position++) & 0xFF;
-    }
-
-    /**
-     * Does the same thing as <code>readFully</code> do but without copying data (thread safe)
-     * @param length length of the bytes to read
-     * @return buffer with portion of file content
-     * @throws IOException on any fail of I/O operation
-     */
-    public ByteBuffer readBytes(int length) throws IOException
-    {
-        int remaining = buffer.remaining() - position;
-        if (length > remaining)
-            throw new IOException(String.format("mmap segment underflow; remaining is %d but %d requested",
-                                                remaining, length));
-
-        if (length == 0)
-            return ByteBufferUtil.EMPTY_BYTE_BUFFER;
-
-        ByteBuffer bytes = buffer.duplicate();
-        bytes.position(buffer.position() + position).limit(buffer.position() + position + length);
-        position += length;
-
-        // we have to copy the data in case we unreference the underlying sstable.  See CASSANDRA-3179
-        ByteBuffer clone = ByteBuffer.allocate(bytes.remaining());
-        clone.put(bytes);
-        clone.flip();
-        return clone;
-    }
-
-    @Override
-    public final void readFully(byte[] bytes) throws IOException
-    {
-        ByteBufferUtil.arrayCopy(buffer, buffer.position() + position, bytes, 0, bytes.length);
-        position += bytes.length;
-    }
-
-    @Override
-    public final void readFully(byte[] buffer, int offset, int count) throws IOException
-    {
-        throw new UnsupportedOperationException("use readBytes instead");
-    }
-
-    private static class MappedFileDataInputMark implements FileMark
-    {
-        int position;
-
-        MappedFileDataInputMark(int position)
-        {
-            this.position = position;
-        }
-    }
-
-    @Override
-    public String toString() {
-        return getClass().getSimpleName() + "(" +
-               "filename='" + filename + "'" +
-               ", position=" + position +
-               ")";
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f8254d/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
index eec6581..aa0a10f 100644
--- a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
@@ -86,7 +86,7 @@ public class MmappedSegmentedFile extends SegmentedFile
         if (segment.right != null)
         {
             // segment is mmap'd
-            return new MappedFileDataInput(segment.right, path, segment.left, (int) (position - segment.left));
+            return new ByteBufferDataInput(segment.right, path, segment.left, (int) (position - segment.left));
         }
 
         // we can have single cells or partitions larger than 2Gb, which is our maximum addressable range in a single segment;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f8254d/src/java/org/apache/cassandra/metrics/CommitLogMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/CommitLogMetrics.java b/src/java/org/apache/cassandra/metrics/CommitLogMetrics.java
index 670fa6b..7a9f6e5 100644
--- a/src/java/org/apache/cassandra/metrics/CommitLogMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/CommitLogMetrics.java
@@ -33,17 +33,23 @@ public class CommitLogMetrics
     public static final MetricNameFactory factory = new DefaultNameFactory("CommitLog");
 
     /** Number of completed tasks */
-    public final Gauge<Long> completedTasks;
+    public Gauge<Long> completedTasks;
     /** Number of pending tasks */
-    public final Gauge<Long> pendingTasks;
+    public Gauge<Long> pendingTasks;
     /** Current size used by all the commit log segments */
-    public final Gauge<Long> totalCommitLogSize;
+    public Gauge<Long> totalCommitLogSize;
     /** Time spent waiting for a CLS to be allocated - under normal conditions this should be zero */
     public final Timer waitingOnSegmentAllocation;
     /** The time spent waiting on CL sync; for Periodic this is only occurs when the sync is lagging its sync interval */
     public final Timer waitingOnCommit;
+    
+    public CommitLogMetrics()
+    {
+        waitingOnSegmentAllocation = Metrics.timer(factory.createMetricName("WaitingOnSegmentAllocation"));
+        waitingOnCommit = Metrics.timer(factory.createMetricName("WaitingOnCommit"));
+    }
 
-    public CommitLogMetrics(final AbstractCommitLogService service, final CommitLogSegmentManager allocator)
+    public void attach(final AbstractCommitLogService service, final CommitLogSegmentManager allocator)
     {
         completedTasks = Metrics.register(factory.createMetricName("CompletedTasks"), new Gauge<Long>()
         {
@@ -66,7 +72,5 @@ public class CommitLogMetrics
                 return allocator.bytesUsed();
             }
         });
-        waitingOnSegmentAllocation = Metrics.timer(factory.createMetricName("WaitingOnSegmentAllocation"));
-        waitingOnCommit = Metrics.timer(factory.createMetricName("WaitingOnCommit"));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f8254d/test/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml
index 307ca8c..f419fbd 100644
--- a/test/conf/cassandra.yaml
+++ b/test/conf/cassandra.yaml
@@ -7,6 +7,7 @@ memtable_allocation_type: offheap_objects
 commitlog_sync: batch
 commitlog_sync_batch_window_in_ms: 1.0
 commitlog_segment_size_in_mb: 5
+commitlog_directory: build/test/cassandra/commitlog
 partitioner: org.apache.cassandra.dht.ByteOrderedPartitioner
 listen_address: 127.0.0.1
 storage_port: 7010
@@ -14,7 +15,6 @@ rpc_port: 9170
 start_native_transport: true
 native_transport_port: 9042
 column_index_size_in_kb: 4
-commitlog_directory: build/test/cassandra/commitlog
 saved_caches_directory: build/test/cassandra/saved_caches
 data_file_directories:
     - build/test/cassandra/data

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f8254d/test/conf/commitlog_compression.yaml
----------------------------------------------------------------------
diff --git a/test/conf/commitlog_compression.yaml b/test/conf/commitlog_compression.yaml
new file mode 100644
index 0000000..9849d7b
--- /dev/null
+++ b/test/conf/commitlog_compression.yaml
@@ -0,0 +1,2 @@
+commitlog_compression:
+    - class_name: LZ4Compressor

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f8254d/test/long/org/apache/cassandra/db/commitlog/ComitLogStress.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/commitlog/ComitLogStress.java b/test/long/org/apache/cassandra/db/commitlog/ComitLogStress.java
index dc90804..b4efd49 100644
--- a/test/long/org/apache/cassandra/db/commitlog/ComitLogStress.java
+++ b/test/long/org/apache/cassandra/db/commitlog/ComitLogStress.java
@@ -48,7 +48,7 @@ public class ComitLogStress
             System.out.println("Setting num threads to: " + NUM_THREADS);
         }
         ExecutorService executor = new JMXEnabledThreadPoolExecutor(NUM_THREADS, NUM_THREADS, 60,
-                TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10 * NUM_THREADS), new NamedThreadFactory(""), "");
+                TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10 * NUM_THREADS), new NamedThreadFactory("Stress"), "");
         ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1);
 
         org.apache.cassandra.SchemaLoader.loadSchema();
@@ -86,10 +86,12 @@ public class ComitLogStress
         public void run() {
             String ks = "Keyspace1";
             ByteBuffer key = ByteBufferUtil.bytes(keyString);
-            Mutation mutation = new Mutation(ks, key);
-            mutation.add("Standard1", Util.cellname("name"), ByteBufferUtil.bytes("value"),
-                    System.currentTimeMillis());
-            CommitLog.instance.add(mutation);
+            for (int i=0; i<100; ++i) {
+                Mutation mutation = new Mutation(ks, key);
+                mutation.add("Standard1", Util.cellname("name"), ByteBufferUtil.bytes("value" + i),
+                        System.currentTimeMillis());
+                CommitLog.instance.add(mutation);
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f8254d/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
new file mode 100644
index 0000000..22dbe11
--- /dev/null
+++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
@@ -0,0 +1,412 @@
+package org.apache.cassandra.db.commitlog;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import junit.framework.Assert;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.RateLimiter;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.Config.CommitLogSync;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.ParametrizedClass;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.Cell;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ColumnSerializer;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.io.util.FastByteArrayInputStream;
+
+public class CommitLogStressTest
+{
+
+    public static ByteBuffer dataSource;
+    
+    public static int NUM_THREADS = 4 * Runtime.getRuntime().availableProcessors() - 1;
+
+    public static int numCells = 1;
+
+    public static int cellSize = 1024;
+    
+    public static int rateLimit = 0;
+    
+    public static int runTimeMs = 10000;
+    
+    public static String location = DatabaseDescriptor.getCommitLogLocation() + "/stress";
+    
+    public static int hash(int hash, ByteBuffer bytes)
+    {
+        int shift = 0;
+        for (int i=0; i<bytes.limit(); i++) {
+            hash += (bytes.get(i) & 0xFF) << shift;
+            shift = (shift + 8) & 0x1F;
+        }
+        return hash;
+    }
+    
+    public static void main(String[] args) throws Exception {
+        try {
+            if (args.length >= 1) {
+                NUM_THREADS = Integer.parseInt(args[0]);
+                System.out.println("Setting num threads to: " + NUM_THREADS);
+            }
+    
+            if (args.length >= 2) {
+                numCells = Integer.parseInt(args[1]);
+                System.out.println("Setting num cells to: " + numCells);
+            }
+    
+            if (args.length >= 3) {
+                cellSize = Integer.parseInt(args[1]);
+                System.out.println("Setting cell size to: " + cellSize + " be aware the source corpus may be small");
+            }
+    
+            if (args.length >= 4) {
+                rateLimit = Integer.parseInt(args[1]);
+                System.out.println("Setting per thread rate limit to: " + rateLimit);
+            }
+            initialize();
+            
+            CommitLogStressTest tester = new CommitLogStressTest();
+            tester.testFixedSize();
+        }
+        catch (Exception e)
+        {
+            e.printStackTrace(System.err);
+        }
+        finally {
+            System.exit(0);
+        }
+    }
+    
+    boolean failed = false;
+    volatile boolean stop = false;
+    boolean randomSize = false;
+    boolean discardedRun = false;
+    ReplayPosition discardedPos;
+    
+    @BeforeClass
+    static public void initialize() throws FileNotFoundException, IOException, InterruptedException
+    {
+        try (FileInputStream fis = new FileInputStream("CHANGES.txt"))
+        {
+            dataSource = ByteBuffer.allocateDirect((int)fis.getChannel().size());
+            while (dataSource.hasRemaining()) {
+                fis.getChannel().read(dataSource);
+            }
+            dataSource.flip();
+        }
+
+        SchemaLoader.loadSchema();
+        SchemaLoader.schemaDefinition(""); // leave def. blank to maintain old behaviour
+
+        File dir = new File(location);
+        if (dir.isDirectory())
+        {
+            File[] files = dir.listFiles();
+    
+            for (File f : files)
+                if (!f.delete())
+                    Assert.fail("Failed to delete " + f);
+        } else {
+            dir.mkdir();
+        }
+    }
+
+    @Test
+    public void testRandomSize() throws Exception
+    {
+        randomSize = false;
+        discardedRun = false;
+        testAllLogConfigs();
+    }
+
+    @Test
+    public void testFixedSize() throws Exception
+    {
+        randomSize = false;
+        discardedRun = false;
+
+        testAllLogConfigs();
+    }
+
+    @Test
+    public void testDiscardedRun() throws Exception
+    {
+        discardedRun = true;
+        randomSize = true;
+
+        testAllLogConfigs();
+    }
+
+    public void testAllLogConfigs() throws IOException, InterruptedException
+    {
+        failed = false;
+        DatabaseDescriptor.setCommitLogSyncBatchWindow(1);
+        DatabaseDescriptor.setCommitLogSyncPeriod(30);
+        DatabaseDescriptor.setCommitLogSegmentSize(32);
+        for (ParametrizedClass compressor : new ParametrizedClass[] {
+                null,
+                new ParametrizedClass("LZ4Compressor", null),
+                new ParametrizedClass("SnappyCompressor", null),
+                new ParametrizedClass("DeflateCompressor", null)}) {
+            DatabaseDescriptor.setCommitLogCompression(compressor);
+            for (CommitLogSync sync : CommitLogSync.values())
+            {
+                DatabaseDescriptor.setCommitLogSync(sync);
+                CommitLog commitLog = new CommitLog(location, CommitLog.instance.archiver);
+                testLog(commitLog);
+            }
+        }
+        assert !failed;
+    }
+
+    public void testLog(CommitLog commitLog) throws IOException, InterruptedException {
+        System.out.format("\nTesting commit log size %dmb, compressor %s, sync %s%s%s\n",
+                           mb(DatabaseDescriptor.getCommitLogSegmentSize()),
+                           commitLog.compressor != null ? commitLog.compressor.getClass().getSimpleName() : "none",
+                           commitLog.executor.getClass().getSimpleName(),
+                           randomSize ? " random size" : "",
+                           discardedRun ? " with discarded run" : "");
+        commitLog.allocator.enableReserveSegmentCreation();
+        
+        final List<CommitlogExecutor> threads = new ArrayList<>();
+        ScheduledExecutorService scheduled = startThreads(commitLog, threads);
+
+        discardedPos = ReplayPosition.NONE;
+        if (discardedRun) {
+            // Makes sure post-break data is not deleted, and that replayer correctly rejects earlier mutations.
+            Thread.sleep(runTimeMs / 3);
+            stop = true;
+            scheduled.shutdown();
+            scheduled.awaitTermination(2, TimeUnit.SECONDS);
+
+            for (CommitlogExecutor t: threads)
+            {
+                t.join();
+                CommitLog.instance.discardCompletedSegments( Schema.instance.getCFMetaData("Keyspace1", "Standard1").cfId, t.rp);
+                if (t.rp.compareTo(discardedPos) > 0)
+                    discardedPos = t.rp;
+            }
+            threads.clear();
+            System.out.format("Discarded at %s\n", discardedPos);
+
+            scheduled = startThreads(commitLog, threads);
+        }
+
+        
+        Thread.sleep(runTimeMs);
+        stop = true;
+        scheduled.shutdown();
+        scheduled.awaitTermination(2, TimeUnit.SECONDS);
+
+        int hash = 0;
+        int cells = 0;
+        for (CommitlogExecutor t: threads) {
+            t.join();
+            hash += t.hash;
+            cells += t.cells;
+        }
+        
+        commitLog.shutdownBlocking();
+
+        System.out.print("Stopped. Replaying... "); System.out.flush();
+        Replayer repl = new Replayer();
+        File[] files = new File(location).listFiles();
+        repl.recover(files);
+
+        for (File f : files)
+            if (!f.delete())
+                Assert.fail("Failed to delete " + f);
+        
+        if (hash == repl.hash && cells == repl.cells)
+            System.out.println("Test success.");
+        else
+        {
+            System.out.format("Test failed. Cells %d expected %d, hash %d expected %d.\n", repl.cells, cells, repl.hash, hash);
+            failed = true;
+        }
+    }
+
+    public ScheduledExecutorService startThreads(CommitLog commitLog, final List<CommitlogExecutor> threads)
+    {
+        stop = false;
+        for (int ii = 0; ii < NUM_THREADS; ii++) {
+            final CommitlogExecutor t = new CommitlogExecutor(commitLog);
+            threads.add(t);
+            t.start();
+        }
+
+        final long start = System.currentTimeMillis();
+        Runnable printRunnable = new Runnable() {
+            long lastUpdate = 0;
+
+            public void run() {
+              Runtime runtime = Runtime.getRuntime();
+              long maxMemory = mb(runtime.maxMemory());
+              long allocatedMemory = mb(runtime.totalMemory());
+              long freeMemory = mb(runtime.freeMemory());
+              long temp = 0;
+              long sz = 0;
+              for (CommitlogExecutor cle : threads) {
+                  temp += cle.counter.get();
+                  sz += cle.dataSize;
+              }
+              double time = (System.currentTimeMillis() - start) / 1000.0;
+              double avg = (temp / time);
+              System.out.println(String.format("second %d mem max %dmb allocated %dmb free %dmb mutations %d since start %d avg %.3f transfer %.3fmb",
+                      ((System.currentTimeMillis() - start) / 1000),
+                      maxMemory, allocatedMemory, freeMemory, (temp - lastUpdate), lastUpdate, avg, mb(sz / time)));
+              lastUpdate = temp;
+            }
+        };
+        ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1);
+        scheduled.scheduleAtFixedRate(printRunnable, 1, 1, TimeUnit.SECONDS);
+        return scheduled;
+    }
+
+    private static long mb(long maxMemory) {
+        return maxMemory / (1024 * 1024);
+    }
+
+    private static double mb(double maxMemory) {
+        return maxMemory / (1024 * 1024);
+    }
+
+    public static ByteBuffer randomBytes(int quantity, ThreadLocalRandom tlr) {
+        ByteBuffer slice = ByteBuffer.allocate(quantity);
+        ByteBuffer source = dataSource.duplicate();
+        source.position(tlr.nextInt(source.capacity() - quantity));
+        source.limit(source.position() + quantity);
+        slice.put(source);
+        slice.flip();
+        return slice;
+    }
+
+    public class CommitlogExecutor extends Thread {
+        final AtomicLong counter = new AtomicLong();
+        int hash = 0;
+        int cells = 0;
+        int dataSize = 0;
+        final CommitLog commitLog;
+
+        volatile ReplayPosition rp;
+
+        public CommitlogExecutor(CommitLog commitLog)
+        {
+            this.commitLog = commitLog;
+        }
+
+        public void run() {
+            RateLimiter rl = rateLimit != 0 ? RateLimiter.create(rateLimit) : null;
+            final ThreadLocalRandom tlr = ThreadLocalRandom.current();
+            while (!stop) {
+                if (rl != null)
+                    rl.acquire();
+                String ks = "Keyspace1";
+                ByteBuffer key = randomBytes(16, tlr);
+                Mutation mutation = new Mutation(ks, key);
+
+                for (int ii = 0; ii < numCells; ii++) {
+                    int sz = randomSize ? tlr.nextInt(cellSize) : cellSize;
+                    ByteBuffer bytes = randomBytes(sz, tlr);
+                    mutation.add("Standard1", Util.cellname("name" + ii), bytes,
+                            System.currentTimeMillis());
+                    hash = hash(hash, bytes);
+                    ++cells;
+                    dataSize += sz;
+                }
+                rp = commitLog.add(mutation);
+                counter.incrementAndGet();
+            }
+        }
+    }
+    
+    class Replayer extends CommitLogReplayer {
+
+        Replayer()
+        {
+            super(discardedPos, null);
+        }
+
+        int hash = 0;
+        int cells = 0;
+
+        @Override
+        void replayMutation(byte[] inputBuffer, int size,
+                final long entryLocation, final CommitLogDescriptor desc, final ReplayFilter replayFilter)
+        {
+            if (desc.id < discardedPos.segment) {
+                System.out.format("Mutation from discarded segment, segment %d pos %d\n", desc.id, entryLocation);
+                return;
+            } else if (desc.id == discardedPos.segment && entryLocation <= discardedPos.position)
+                // Skip over this mutation.
+                return;
+                
+            FastByteArrayInputStream bufIn = new FastByteArrayInputStream(inputBuffer, 0, size);
+            Mutation mutation;
+            try
+            {
+                mutation = Mutation.serializer.deserialize(new DataInputStream(bufIn),
+                                                               desc.getMessagingVersion(),
+                                                               ColumnSerializer.Flag.LOCAL);
+            }
+            catch (IOException e)
+            {
+                // Test fails.
+                throw new AssertionError(e);
+            }
+
+            for (ColumnFamily cf : mutation.getColumnFamilies()) {
+                for (Cell c : cf.getSortedColumns()) {
+                    if (new String(c.name().toByteBuffer().array(), StandardCharsets.UTF_8).startsWith("name"))
+                    {
+                        hash = hash(hash, c.value());
+                        ++cells;
+                    }
+                }
+            }
+        }
+        
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f8254d/test/unit/org/apache/cassandra/db/CommitLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CommitLogTest.java b/test/unit/org/apache/cassandra/db/CommitLogTest.java
index 2d6e7fd..3073ecd 100644
--- a/test/unit/org/apache/cassandra/db/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/CommitLogTest.java
@@ -19,13 +19,24 @@
 
 package org.apache.cassandra.db;
 
-import java.io.*;
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.zip.CRC32;
 import java.util.zip.Checksum;
 
+import com.google.common.collect.ImmutableMap;
+
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -35,16 +46,19 @@ import org.apache.cassandra.Util;
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.config.ParametrizedClass;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.commitlog.CommitLogDescriptor;
-import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.commitlog.CommitLogSegment;
+import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.locator.SimpleStrategy;
 import org.apache.cassandra.db.composites.CellNameType;
 import org.apache.cassandra.db.filter.NamesQueryFilter;
+import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.io.util.ByteBufferDataInput;
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.locator.SimpleStrategy;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -52,8 +66,6 @@ import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.KillerForTests;
 
-import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
-
 public class CommitLogTest
 {
     private static final String KEYSPACE1 = "CommitLogTest";
@@ -188,7 +200,7 @@ public class CommitLogTest
 
         // Adding new mutation on another CF, large enough (including CL entry overhead) that a new segment is created
         Mutation rm2 = new Mutation(KEYSPACE1, bytes("k"));
-        rm2.add(CF2, Util.cellname("c1"), ByteBuffer.allocate((DatabaseDescriptor.getCommitLogSegmentSize()/2) - 100), 0);
+        rm2.add(CF2, Util.cellname("c1"), ByteBuffer.allocate((DatabaseDescriptor.getCommitLogSegmentSize()/2) - 200), 0);
         CommitLog.instance.add(rm2);
         // also forces a new segment, since each entry-with-overhead is just under half the CL size
         CommitLog.instance.add(rm2);
@@ -298,7 +310,7 @@ public class CommitLogTest
 
         Assert.assertEquals(1340512736956320000L, CommitLogDescriptor.fromFileName("CommitLog-2-1340512736956320000.log").id);
 
-        Assert.assertEquals(MessagingService.current_version, new CommitLogDescriptor(1340512736956320000L).getMessagingVersion());
+        Assert.assertEquals(MessagingService.current_version, new CommitLogDescriptor(1340512736956320000L, null).getMessagingVersion());
         String newCLName = "CommitLog-" + CommitLogDescriptor.current_version + "-1340512736956320000.log";
         Assert.assertEquals(MessagingService.current_version, CommitLogDescriptor.fromFileName(newCLName).getMessagingVersion());
     }
@@ -398,4 +410,48 @@ public class CommitLogTest
         row = command.getRow(notDurableKs);
         Assert.assertEquals(null, row.cf);
     }
+    
+    private void testDescriptorPersistence(CommitLogDescriptor desc) throws IOException
+    {
+        ByteBuffer buf = ByteBuffer.allocate(1024);
+        CommitLogDescriptor.writeHeader(buf, desc);
+        long length = buf.position();
+        // Put some extra data in the stream.
+        buf.putDouble(0.1);
+        buf.flip();
+        FileDataInput input = new ByteBufferDataInput(buf, "input", 0, 0);
+        CommitLogDescriptor read = CommitLogDescriptor.readHeader(input);
+        Assert.assertEquals("Descriptor length", length, input.getFilePointer());
+        Assert.assertEquals("Descriptors", desc, read);
+    }
+    
+    @Test
+    public void testDescriptorPersistence() throws IOException
+    {
+        testDescriptorPersistence(new CommitLogDescriptor(11, null));
+        testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_21, 13, null));
+        testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_30, 15, null));
+        testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_30, 17, new ParametrizedClass("LZ4Compressor", null)));
+        testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_30, 19,
+                new ParametrizedClass("StubbyCompressor", ImmutableMap.of("parameter1", "value1", "flag2", "55", "argument3", "null"))));
+    }
+
+    @Test
+    public void testDescriptorInvalidParametersSize() throws IOException
+    {
+        Map<String, String> params = new HashMap<>();
+        for (int i=0; i<65535; ++i)
+            params.put("key"+i, Integer.toString(i, 16));
+        try {
+            CommitLogDescriptor desc = new CommitLogDescriptor(CommitLogDescriptor.VERSION_30,
+                                                               21,
+                                                               new ParametrizedClass("LZ4Compressor", params));
+            ByteBuffer buf = ByteBuffer.allocate(1024000);
+            CommitLogDescriptor.writeHeader(buf, desc);
+            Assert.fail("Parameter object too long should fail on writing descriptor.");
+        } catch (ConfigurationException e)
+        {
+            // correct path
+        }
+    }
 }


Mime
View raw message