accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mmil...@apache.org
Subject [accumulo] branch master updated: Add ZStandard compression codec. Fixes #438 (#456)
Date Wed, 09 May 2018 21:32:09 GMT
This is an automated email from the ASF dual-hosted git repository.

mmiller pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/master by this push:
     new 30707fe  Add ZStandard compression codec. Fixes #438 (#456)
30707fe is described below

commit 30707fe5a0f0d80bf7ac52d8204dd432ead4db4a
Author: Mike Miller <mmiller@apache.org>
AuthorDate: Wed May 9 17:32:06 2018 -0400

    Add ZStandard compression codec. Fixes #438 (#456)
---
 .../mapreduce/lib/impl/FileOutputConfigurator.java |   7 +-
 .../org/apache/accumulo/core/conf/Property.java    |   2 +-
 .../core/file/rfile/bcfile/Compression.java        | 119 +++++++++++++++++++++
 3 files changed, 124 insertions(+), 4 deletions(-)

diff --git a/client/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/FileOutputConfigurator.java
b/client/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/FileOutputConfigurator.java
index e803562..06a769f 100644
--- a/client/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/FileOutputConfigurator.java
+++ b/client/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/FileOutputConfigurator.java
@@ -135,14 +135,15 @@ public class FileOutputConfigurator extends ConfiguratorBase {
    * @param conf
    *          the Hadoop configuration object to configure
    * @param compressionType
-   *          one of "none", "gz", "lzo", or "snappy"
+   *          one of "none", "gz", "lzo", "snappy", or "zstd"
    * @since 1.6.0
    */
   public static void setCompressionType(Class<?> implementingClass, Configuration conf,
       String compressionType) {
     if (compressionType == null
-        || !Arrays.asList("none", "gz", "lzo", "snappy").contains(compressionType))
-      throw new IllegalArgumentException("Compression type must be one of: none, gz, lzo,
snappy");
+        || !Arrays.asList("none", "gz", "lzo", "snappy", "zstd").contains(compressionType))
+      throw new IllegalArgumentException(
+          "Compression type must be one of: none, gz, lzo, snappy, zstd");
     setAccumuloProperty(implementingClass, conf, Property.TABLE_FILE_COMPRESSION_TYPE,
         compressionType);
   }
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index b902752..3009975 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -708,7 +708,7 @@ public enum Property {
           + " to change the called Load Balancer for this table"),
   TABLE_FILE_COMPRESSION_TYPE("table.file.compress.type", "gz", PropertyType.STRING,
       "Compression algorithm used on index and data blocks before they are"
-          + " written. Possible values: gz, snappy, lzo, none"),
+          + " written. Possible values: zstd, gz, snappy, lzo, none"),
   TABLE_FILE_COMPRESSED_BLOCK_SIZE("table.file.compress.blocksize", "100K", PropertyType.BYTES,
       "The maximum size of data blocks in RFiles before they are compressed and written."),
   TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX("table.file.compress.blocksize.index", "128K",
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java
b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java
index 0e6234a..6d717bb 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java
@@ -76,6 +76,8 @@ public final class Compression {
     }
   }
 
+  /** compression: zStandard */
+  public static final String COMPRESSION_ZSTD = "zstd";
   /** snappy codec **/
   public static final String COMPRESSION_SNAPPY = "snappy";
   /** compression: gzip */
@@ -447,6 +449,122 @@ public final class Compression {
 
         return snappyCodec != null;
       }
+    },
+
+    ZSTANDARD(COMPRESSION_ZSTD) {
+      // Use base type to avoid compile-time dependencies.
+      private transient CompressionCodec zstdCodec = null;
+      /**
+       * determines if we've checked the codec status. ensures we don't recreate the default
codec
+       */
+      private final AtomicBoolean checked = new AtomicBoolean(false);
+      private static final String defaultClazz = "org.apache.hadoop.io.compress.ZStandardCodec";
+
+      /**
+       * Buffer size option
+       */
+      private static final String BUFFER_SIZE_OPT = "io.compression.codec.zstd.buffersize";
+
+      /**
+       * Default buffer size value
+       */
+      private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
+
+      @Override
+      public CompressionCodec getCodec() {
+        return zstdCodec;
+      }
+
+      @Override
+      public void initializeDefaultCodec() {
+        if (!checked.get()) {
+          checked.set(true);
+          zstdCodec = createNewCodec(DEFAULT_BUFFER_SIZE);
+        }
+      }
+
+      /**
+       * Creates a new ZStandard codec.
+       *
+       * @param bufferSize
+       *          incoming buffer size
+       * @return new codec or null, depending on if installed
+       */
+      @Override
+      protected CompressionCodec createNewCodec(final int bufferSize) {
+
+        String extClazz = (conf.get(CONF_ZSTD_CLASS) == null ? System.getProperty(CONF_ZSTD_CLASS)
+            : null);
+        String clazz = (extClazz != null) ? extClazz : defaultClazz;
+        try {
+          log.info("Trying to load ZStandard codec class: {}", clazz);
+
+          Configuration myConf = new Configuration(conf);
+          // only use the buffersize if > 0, otherwise we'll use
+          // the default defined within the codec
+          if (bufferSize > 0)
+            myConf.setInt(BUFFER_SIZE_OPT, bufferSize);
+
+          return (CompressionCodec) ReflectionUtils.newInstance(Class.forName(clazz), myConf);
+
+        } catch (ClassNotFoundException e) {
+          // that is okay
+        }
+
+        return null;
+      }
+
+      @Override
+      public OutputStream createCompressionStream(OutputStream downStream, Compressor compressor,
+          int downStreamBufferSize) throws IOException {
+
+        if (!isSupported()) {
+          throw new IOException(
+              "ZStandard codec class not specified. Did you forget to set property "
+                  + CONF_ZSTD_CLASS + "?");
+        }
+        OutputStream bos1;
+        if (downStreamBufferSize > 0) {
+          bos1 = new BufferedOutputStream(downStream, downStreamBufferSize);
+        } else {
+          bos1 = downStream;
+        }
+        // use the default codec
+        CompressionOutputStream cos = zstdCodec.createOutputStream(bos1, compressor);
+        BufferedOutputStream bos2 = new BufferedOutputStream(
+            new FinishOnFlushCompressionStream(cos), DATA_OBUF_SIZE);
+        return bos2;
+      }
+
+      @Override
+      public InputStream createDecompressionStream(InputStream downStream,
+          Decompressor decompressor, int downStreamBufferSize) throws IOException {
+        if (!isSupported()) {
+          throw new IOException(
+              "ZStandard codec class not specified. Did you forget to set property "
+                  + CONF_ZSTD_CLASS + "?");
+        }
+
+        CompressionCodec decomCodec = zstdCodec;
+        // if we're not using the same buffer size, we'll pull the codec from the loading
cache
+        if (DEFAULT_BUFFER_SIZE != downStreamBufferSize) {
+          Entry<Algorithm,Integer> sizeOpt = Maps.immutableEntry(ZSTANDARD, downStreamBufferSize);
+          try {
+            decomCodec = codecCache.get(sizeOpt);
+          } catch (ExecutionException e) {
+            throw new IOException(e);
+          }
+        }
+
+        CompressionInputStream cis = decomCodec.createInputStream(downStream, decompressor);
+        BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE);
+        return bis2;
+      }
+
+      @Override
+      public boolean isSupported() {
+        return zstdCodec != null;
+      }
     };
 
     /**
@@ -493,6 +611,7 @@ public final class Compression {
     private static final int DATA_OBUF_SIZE = 4 * 1024;
     public static final String CONF_LZO_CLASS = "io.compression.codec.lzo.class";
     public static final String CONF_SNAPPY_CLASS = "io.compression.codec.snappy.class";
+    public static final String CONF_ZSTD_CLASS = "io.compression.codec.zstd.class";
 
     Algorithm(String name) {
       this.compressName = name;

-- 
To stop receiving notification emails like this one, please contact
mmiller@apache.org.

Mime
View raw message