cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marc...@apache.org
Subject cassandra git commit: Make LZ4 compression level configurable
Date Fri, 08 Apr 2016 06:19:41 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk e1e692a75 -> e06d411f6


Make LZ4 compression level configurable

Patch by mkjellman; reviewed by marcuse for CASSANDRA-11051


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e06d411f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e06d411f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e06d411f

Branch: refs/heads/trunk
Commit: e06d411f6a6a459503f09fe80208d1ee261ed772
Parents: e1e692a
Author: Michael Kjellman <mkjellman@internalcircle.com>
Authored: Fri Feb 19 09:47:44 2016 +0100
Committer: Marcus Eriksson <marcuse@apache.org>
Committed: Fri Apr 8 08:09:36 2016 +0200

----------------------------------------------------------------------
 .../cassandra/io/compress/LZ4Compressor.java    | 106 +++++++++++++++++--
 .../cassandra/schema/CompressionParams.java     |   6 +-
 .../io/compress/CompressorPerformance.java      |   3 +-
 .../db/commitlog/SegmentReaderTest.java         |   3 +-
 .../io/compress/CQLCompressionTest.java         |  56 ++++++++++
 .../CompressedSequentialWriterTest.java         |   7 +-
 6 files changed, 166 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e06d411f/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java b/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
index 3a3b024..3fd889e 100644
--- a/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
+++ b/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
@@ -23,31 +23,80 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import net.jpountz.lz4.LZ4Exception;
 import net.jpountz.lz4.LZ4Factory;
-import org.apache.cassandra.schema.CompressionParams;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.utils.Pair;
 
 public class LZ4Compressor implements ICompressor
 {
+    private static final Logger logger = LoggerFactory.getLogger(LZ4Compressor.class);
+
+    public static final String LZ4_FAST_COMPRESSOR = "fast";
+    public static final String LZ4_HIGH_COMPRESSOR = "high";
+    private static final Set<String> VALID_COMPRESSOR_TYPES = new HashSet<>(Arrays.asList(LZ4_FAST_COMPRESSOR,
LZ4_HIGH_COMPRESSOR));
+
+    private static final int DEFAULT_HIGH_COMPRESSION_LEVEL = 9;
+    private static final String DEFAULT_LZ4_COMPRESSOR_TYPE = LZ4_FAST_COMPRESSOR;
+
+    public static final String LZ4_HIGH_COMPRESSION_LEVEL = "lz4_high_compressor_level";
+    public static final String LZ4_COMPRESSOR_TYPE = "lz4_compressor_type";
+
     private static final int INTEGER_BYTES = 4;
 
-    @VisibleForTesting
-    public static final LZ4Compressor instance = new LZ4Compressor();
+    private static final ConcurrentHashMap<Pair<String, Integer>, LZ4Compressor>
instances = new ConcurrentHashMap<>();
 
-    public static LZ4Compressor create(Map<String, String> args)
+    public static LZ4Compressor create(Map<String, String> args) throws ConfigurationException
     {
+        String compressorType = validateCompressorType(args.get(LZ4_COMPRESSOR_TYPE));
+        Integer compressionLevel = validateCompressionLevel(args.get(LZ4_HIGH_COMPRESSION_LEVEL));
+
+        Pair<String, Integer> compressorTypeAndLevel = Pair.create(compressorType,
compressionLevel);
+        LZ4Compressor instance = instances.get(compressorTypeAndLevel);
+        if (instance == null)
+        {
+            if (compressorType.equals(LZ4_FAST_COMPRESSOR) && args.get(LZ4_HIGH_COMPRESSION_LEVEL)
!= null)
+                logger.warn("'{}' parameter is ignored when '{}' is '{}'", LZ4_HIGH_COMPRESSION_LEVEL,
LZ4_COMPRESSOR_TYPE, LZ4_FAST_COMPRESSOR);
+            instance = new LZ4Compressor(compressorType, compressionLevel);
+            LZ4Compressor instanceFromMap = instances.putIfAbsent(compressorTypeAndLevel,
instance);
+            if(instanceFromMap != null)
+                instance = instanceFromMap;
+        }
         return instance;
     }
 
     private final net.jpountz.lz4.LZ4Compressor compressor;
     private final net.jpountz.lz4.LZ4FastDecompressor decompressor;
+    @VisibleForTesting
+    final String compressorType;
+    @VisibleForTesting
+    final Integer compressionLevel;
 
-    private LZ4Compressor()
+    private LZ4Compressor(String type, Integer compressionLevel)
     {
+        this.compressorType = type;
+        this.compressionLevel = compressionLevel;
         final LZ4Factory lz4Factory = LZ4Factory.fastestInstance();
-        compressor = lz4Factory.fastCompressor();
+        switch (type)
+        {
+            case LZ4_HIGH_COMPRESSOR:
+            {
+                compressor = lz4Factory.highCompressor(compressionLevel);
+                break;
+            }
+            case LZ4_FAST_COMPRESSOR:
+            default:
+            {
+                compressor = lz4Factory.fastCompressor();
+            }
+        }
+
         decompressor = lz4Factory.fastDecompressor();
     }
 
@@ -127,7 +176,50 @@ public class LZ4Compressor implements ICompressor
 
     public Set<String> supportedOptions()
     {
-        return new HashSet<>();
+        return new HashSet<>(Arrays.asList(LZ4_HIGH_COMPRESSION_LEVEL, LZ4_COMPRESSOR_TYPE));
+    }
+
+    public static String validateCompressorType(String compressorType) throws ConfigurationException
+    {
+        if (compressorType == null)
+            return DEFAULT_LZ4_COMPRESSOR_TYPE;
+
+        if (!VALID_COMPRESSOR_TYPES.contains(compressorType))
+        {
+            throw new ConfigurationException(String.format("Invalid compressor type '%s'
specified for LZ4 parameter '%s'. "
+                                                           + "Valid options are %s.", compressorType,
LZ4_COMPRESSOR_TYPE,
+                                                           VALID_COMPRESSOR_TYPES.toString()));
+        }
+        else
+        {
+            return compressorType;
+        }
+    }
+
+    public static Integer validateCompressionLevel(String compressionLevel) throws ConfigurationException
+    {
+        if (compressionLevel == null)
+            return DEFAULT_HIGH_COMPRESSION_LEVEL;
+
+        ConfigurationException ex = new ConfigurationException("Invalid value [" + compressionLevel
+ "] for parameter '"
+                                                                 + LZ4_HIGH_COMPRESSION_LEVEL
+ "'. Value must be between 1 and 17.");
+
+        Integer level;
+        try
+        {
+            level = Integer.parseInt(compressionLevel);
+        }
+        catch (NumberFormatException e)
+        {
+            throw ex;
+        }
+
+        if (level < 1 || level > 17)
+        {
+            throw ex;
+        }
+
+        return level;
     }
 
     public BufferType preferredBufferType()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e06d411f/src/java/org/apache/cassandra/schema/CompressionParams.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/CompressionParams.java b/src/java/org/apache/cassandra/schema/CompressionParams.java
index cd1686f..bd10f75 100644
--- a/src/java/org/apache/cassandra/schema/CompressionParams.java
+++ b/src/java/org/apache/cassandra/schema/CompressionParams.java
@@ -56,7 +56,7 @@ public final class CompressionParams
     public static final String CHUNK_LENGTH_IN_KB = "chunk_length_in_kb";
     public static final String ENABLED = "enabled";
 
-    public static final CompressionParams DEFAULT = new CompressionParams(LZ4Compressor.instance,
+    public static final CompressionParams DEFAULT = new CompressionParams(LZ4Compressor.create(Collections.<String,
String>emptyMap()),
                                                                           DEFAULT_CHUNK_LENGTH,
                                                                           Collections.emptyMap());
 
@@ -139,7 +139,7 @@ public final class CompressionParams
 
     public static CompressionParams lz4(Integer chunkLength)
     {
-        return new CompressionParams(LZ4Compressor.instance, chunkLength, Collections.emptyMap());
+        return new CompressionParams(LZ4Compressor.create(Collections.emptyMap()), chunkLength,
Collections.emptyMap());
     }
 
     public CompressionParams(String sstableCompressorClass, Integer chunkLength, Map<String,
String> otherOptions) throws ConfigurationException
@@ -272,7 +272,7 @@ public final class CompressionParams
     private static Map<String, String> copyOptions(Map<? extends CharSequence, ?
extends CharSequence> co)
     {
         if (co == null || co.isEmpty())
-            return Collections.<String, String>emptyMap();
+            return Collections.emptyMap();
 
         Map<String, String> compressionOptions = new HashMap<>();
         for (Map.Entry<? extends CharSequence, ? extends CharSequence> entry : co.entrySet())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e06d411f/test/long/org/apache/cassandra/io/compress/CompressorPerformance.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/io/compress/CompressorPerformance.java b/test/long/org/apache/cassandra/io/compress/CompressorPerformance.java
index 5b23bea..db190bc 100644
--- a/test/long/org/apache/cassandra/io/compress/CompressorPerformance.java
+++ b/test/long/org/apache/cassandra/io/compress/CompressorPerformance.java
@@ -3,6 +3,7 @@ package org.apache.cassandra.io.compress;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.concurrent.ThreadLocalRandom;
 
 public class CompressorPerformance
@@ -13,7 +14,7 @@ public class CompressorPerformance
         for (ICompressor compressor: new ICompressor[] {
                 SnappyCompressor.instance,  // warm up
                 DeflateCompressor.instance,
-                LZ4Compressor.instance,
+                LZ4Compressor.create(Collections.emptyMap()),
                 SnappyCompressor.instance
         })
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e06d411f/test/unit/org/apache/cassandra/db/commitlog/SegmentReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/SegmentReaderTest.java b/test/unit/org/apache/cassandra/db/commitlog/SegmentReaderTest.java
index 04e471d..3ec0db2 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/SegmentReaderTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/SegmentReaderTest.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
+import java.util.Collections;
 import java.util.Random;
 import javax.crypto.Cipher;
 
@@ -52,7 +53,7 @@ public class SegmentReaderTest
     @Test
     public void compressedSegmenter_LZ4() throws IOException
     {
-        compressedSegmenter(LZ4Compressor.create(null));
+        compressedSegmenter(LZ4Compressor.create(Collections.emptyMap()));
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e06d411f/test/unit/org/apache/cassandra/io/compress/CQLCompressionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CQLCompressionTest.java b/test/unit/org/apache/cassandra/io/compress/CQLCompressionTest.java
new file mode 100644
index 0000000..a2aff2f
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/compress/CQLCompressionTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.compress;
+
+import org.junit.Test;
+
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.exceptions.ConfigurationException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class CQLCompressionTest extends CQLTester
+{
+    @Test
+    public void lz4ParamsTest()
+    {
+        createTable("create table %s (id int primary key, uh text) with compression = {'class':'LZ4Compressor',
'lz4_high_compressor_level':3}");
+        assertTrue(((LZ4Compressor)getCurrentColumnFamilyStore().metadata.params.compression.getSstableCompressor()).compressorType.equals(LZ4Compressor.LZ4_FAST_COMPRESSOR));
+        createTable("create table %s (id int primary key, uh text) with compression = {'class':'LZ4Compressor',
'lz4_compressor_type':'high', 'lz4_high_compressor_level':13}");
+        assertEquals(((LZ4Compressor)getCurrentColumnFamilyStore().metadata.params.compression.getSstableCompressor()).compressorType,
LZ4Compressor.LZ4_HIGH_COMPRESSOR);
+        assertEquals(((LZ4Compressor)getCurrentColumnFamilyStore().metadata.params.compression.getSstableCompressor()).compressionLevel,
(Integer)13);
+        createTable("create table %s (id int primary key, uh text) with compression = {'class':'LZ4Compressor'}");
+        assertEquals(((LZ4Compressor)getCurrentColumnFamilyStore().metadata.params.compression.getSstableCompressor()).compressorType,
LZ4Compressor.LZ4_FAST_COMPRESSOR);
+        assertEquals(((LZ4Compressor)getCurrentColumnFamilyStore().metadata.params.compression.getSstableCompressor()).compressionLevel,
(Integer)9);
+    }
+
+    @Test(expected = ConfigurationException.class)
+    public void lz4BadParamsTest() throws Throwable
+    {
+        try
+        {
+            createTable("create table %s (id int primary key, uh text) with compression =
{'class':'LZ4Compressor', 'lz4_compressor_type':'high', 'lz4_high_compressor_level':113}");
+        }
+        catch (RuntimeException e)
+        {
+            throw e.getCause();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e06d411f/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
index 9b09f0b..66a0e28 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
@@ -182,6 +182,7 @@ public class CompressedSequentialWriterTest extends SequentialWriterTest
                                                                    offsetsFile.getPath(),
                                                                    CompressionParams.lz4(BUFFER_SIZE),
                                                                    new MetadataCollector(new
ClusteringComparator(UTF8Type.instance))));
+
         }
 
         private TestableCSW(File file, File offsetsFile, CompressedSequentialWriter sw) throws
IOException
@@ -196,7 +197,7 @@ public class CompressedSequentialWriterTest extends SequentialWriterTest
             Assert.assertFalse(offsetsFile.exists());
             byte[] compressed = readFileToByteArray(file);
             byte[] uncompressed = new byte[partialContents.length];
-            LZ4Compressor.instance.uncompress(compressed, 0, compressed.length - 4, uncompressed,
0);
+            LZ4Compressor.create(Collections.<String, String>emptyMap()).uncompress(compressed,
0, compressed.length - 4, uncompressed, 0);
             Assert.assertTrue(Arrays.equals(partialContents, uncompressed));
         }
 
@@ -214,8 +215,8 @@ public class CompressedSequentialWriterTest extends SequentialWriterTest
             int offset = (int) offsets.readLong();
             byte[] compressed = readFileToByteArray(file);
             byte[] uncompressed = new byte[fullContents.length];
-            LZ4Compressor.instance.uncompress(compressed, 0, offset - 4, uncompressed, 0);
-            LZ4Compressor.instance.uncompress(compressed, offset, compressed.length - (4
+ offset), uncompressed, partialContents.length);
+            LZ4Compressor.create(Collections.<String, String>emptyMap()).uncompress(compressed,
0, offset - 4, uncompressed, 0);
+            LZ4Compressor.create(Collections.<String, String>emptyMap()).uncompress(compressed,
offset, compressed.length - (4 + offset), uncompressed, partialContents.length);
             Assert.assertTrue(Arrays.equals(fullContents, uncompressed));
         }
 


Mime
View raw message