hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jl...@apache.org
Subject [2/2] hadoop git commit: HADOOP-13578. Add Codec for ZStandard Compression. Contributed by churro morales
Date Wed, 04 Jan 2017 16:20:37 GMT
HADOOP-13578. Add Codec for ZStandard Compression. Contributed by churro morales


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/db947fb8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/db947fb8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/db947fb8

Branch: refs/heads/branch-2
Commit: db947fb8704c2098a7d70216ba6d34e65b070379
Parents: 3207762
Author: Jason Lowe <jlowe@apache.org>
Authored: Wed Jan 4 14:46:44 2017 +0000
Committer: Jason Lowe <jlowe@apache.org>
Committed: Wed Jan 4 14:46:44 2017 +0000

----------------------------------------------------------------------
 BUILDING.txt                                    |  24 +
 hadoop-common-project/hadoop-common/pom.xml     |  21 +
 .../hadoop-common/src/CMakeLists.txt            |  27 ++
 .../hadoop-common/src/config.h.cmake            |   1 +
 .../hadoop/fs/CommonConfigurationKeys.java      |  16 +
 .../apache/hadoop/io/compress/Decompressor.java |   2 +-
 .../hadoop/io/compress/ZStandardCodec.java      | 242 +++++++++
 .../io/compress/zstd/ZStandardCompressor.java   | 305 ++++++++++++
 .../io/compress/zstd/ZStandardDecompressor.java | 323 ++++++++++++
 .../hadoop/io/compress/zstd/package-info.java   |  22 +
 .../apache/hadoop/util/NativeCodeLoader.java    |   5 +
 .../hadoop/util/NativeLibraryChecker.java       |  12 +-
 .../io/compress/zstd/ZStandardCompressor.c      | 259 ++++++++++
 .../io/compress/zstd/ZStandardDecompressor.c    | 218 +++++++++
 .../zstd/org_apache_hadoop_io_compress_zstd.h   |  34 ++
 .../org/apache/hadoop/util/NativeCodeLoader.c   |  10 +
 ...g.apache.hadoop.io.compress.CompressionCodec |   2 +-
 .../src/site/markdown/NativeLibraries.md.vm     |   1 +
 .../apache/hadoop/io/compress/TestCodec.java    |  15 +-
 .../io/compress/TestCompressionStreamReuse.java |   8 +
 .../TestZStandardCompressorDecompressor.java    | 485 +++++++++++++++++++
 .../src/test/resources/zstd/test_file.txt       |  71 +++
 .../src/test/resources/zstd/test_file.txt.zst   | Bin 0 -> 3690 bytes
 hadoop-project-dist/pom.xml                     |   6 +
 hadoop-project/pom.xml                          |   2 +
 25 files changed, 2107 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/BUILDING.txt
----------------------------------------------------------------------
diff --git a/BUILDING.txt b/BUILDING.txt
index 2f1f9f8..e596504 100644
--- a/BUILDING.txt
+++ b/BUILDING.txt
@@ -77,6 +77,8 @@ Optional packages:
   $ sudo apt-get install libjansson-dev
 * Linux FUSE
   $ sudo apt-get install fuse libfuse-dev
+* ZStandard compression
+    $ sudo apt-get install zstd
 
 ----------------------------------------------------------------------------------
 Maven main modules:
@@ -148,6 +150,28 @@ Maven build goals:
     and it ignores the -Dsnappy.prefix option. If -Dsnappy.lib isn't given, the
     bundling and building will fail.
 
+ ZStandard build options:
+
+   ZStandard is a compression library that can be utilized by the native code.
+   It is currently an optional component, meaning that Hadoop can be built with
+   or without this dependency.
+
+  * Use -Drequire.zstd to fail the build if libzstd.so is not found.
+    If this option is not specified and the zstd library is missing.
+
+  * Use -Dzstd.prefix to specify a nonstandard location for the libzstd
+    header files and library files. You do not need this option if you have
+    installed zstandard using a package manager.
+
+  * Use -Dzstd.lib to specify a nonstandard location for the libzstd library
+    files.  Similarly to zstd.prefix, you do not need this option if you have
+    installed using a package manager.
+
+  * Use -Dbundle.zstd to copy the contents of the zstd.lib directory into
+    the final tar file. This option requires that -Dzstd.lib is also given,
+    and it ignores the -Dzstd.prefix option. If -Dzstd.lib isn't given, the
+    bundling and building will fail.
+
  OpenSSL build options:
 
    OpenSSL includes a crypto library that can be utilized by the native code.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/hadoop-common-project/hadoop-common/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/pom.xml b/hadoop-common-project/hadoop-common/pom.xml
index ddc85b8..ee7acb3 100644
--- a/hadoop-common-project/hadoop-common/pom.xml
+++ b/hadoop-common-project/hadoop-common/pom.xml
@@ -514,6 +514,10 @@
         <snappy.lib></snappy.lib>
         <snappy.include></snappy.include>
         <require.snappy>false</require.snappy>
+        <zstd.prefix></zstd.prefix>
+        <zstd.lib></zstd.lib>
+        <zstd.include></zstd.include>
+        <require.zstd>false</require.zstd>
         <openssl.prefix></openssl.prefix>
         <openssl.lib></openssl.lib>
         <openssl.include></openssl.include>
@@ -568,6 +572,8 @@
                     <javahClassName>org.apache.hadoop.security.JniBasedUnixGroupsNetgroupMapping</javahClassName>
                     <javahClassName>org.apache.hadoop.io.compress.snappy.SnappyCompressor</javahClassName>
                     <javahClassName>org.apache.hadoop.io.compress.snappy.SnappyDecompressor</javahClassName>
+                    <javahClassName>org.apache.hadoop.io.compress.zstd.ZStandardCompressor</javahClassName>
+                    <javahClassName>org.apache.hadoop.io.compress.zstd.ZStandardDecompressor</javahClassName>
                     <javahClassName>org.apache.hadoop.io.compress.lz4.Lz4Compressor</javahClassName>
                     <javahClassName>org.apache.hadoop.io.compress.lz4.Lz4Decompressor</javahClassName>
                     <javahClassName>org.apache.hadoop.crypto.OpensslCipher</javahClassName>
@@ -599,6 +605,10 @@
                     <CUSTOM_SNAPPY_PREFIX>${snappy.prefix}</CUSTOM_SNAPPY_PREFIX>
                     <CUSTOM_SNAPPY_LIB>${snappy.lib} </CUSTOM_SNAPPY_LIB>
                     <CUSTOM_SNAPPY_INCLUDE>${snappy.include} </CUSTOM_SNAPPY_INCLUDE>
+                    <REQUIRE_ZSTD>${require.zstd}</REQUIRE_ZSTD>
+                    <CUSTOM_ZSTD_PREFIX>${zstd.prefix}</CUSTOM_ZSTD_PREFIX>
+                    <CUSTOM_ZSTD_LIB>${zstd.lib}</CUSTOM_ZSTD_LIB>
+                    <CUSTOM_ZSTD_INCLUDE>${zstd.include}</CUSTOM_ZSTD_INCLUDE>
                     <REQUIRE_OPENSSL>${require.openssl} </REQUIRE_OPENSSL>
                     <CUSTOM_OPENSSL_PREFIX>${openssl.prefix} </CUSTOM_OPENSSL_PREFIX>
                     <CUSTOM_OPENSSL_LIB>${openssl.lib} </CUSTOM_OPENSSL_LIB>
@@ -636,6 +646,11 @@
         <snappy.include></snappy.include>
         <require.snappy>false</require.snappy>
         <bundle.snappy.in.bin>true</bundle.snappy.in.bin>
+        <zstd.prefix></zstd.prefix>
+        <zstd.lib></zstd.lib>
+        <zstd.include></zstd.include>
+        <require.ztsd>false</require.ztsd>
+        <bundle.zstd.in.bin>true</bundle.zstd.in.bin>
         <openssl.prefix></openssl.prefix>
         <openssl.lib></openssl.lib>
         <openssl.include></openssl.include>
@@ -685,6 +700,8 @@
                     <javahClassName>org.apache.hadoop.security.JniBasedUnixGroupsNetgroupMapping</javahClassName>
                     <javahClassName>org.apache.hadoop.io.compress.snappy.SnappyCompressor</javahClassName>
                     <javahClassName>org.apache.hadoop.io.compress.snappy.SnappyDecompressor</javahClassName>
+                    <javahClassName>org.apache.hadoop.io.compress.zstd.ZStandardCompressor</javahClassName>
+                    <javahClassName>org.apache.hadoop.io.compress.zstd.ZStandardDecompressor</javahClassName>
                     <javahClassName>org.apache.hadoop.io.compress.lz4.Lz4Compressor</javahClassName>
                     <javahClassName>org.apache.hadoop.io.compress.lz4.Lz4Decompressor</javahClassName>
                     <javahClassName>org.apache.hadoop.crypto.OpensslCipher</javahClassName>
@@ -736,6 +753,10 @@
                     <argument>/p:CustomSnappyLib=${snappy.lib}</argument>
                     <argument>/p:CustomSnappyInclude=${snappy.include}</argument>
                     <argument>/p:RequireSnappy=${require.snappy}</argument>
+                    <argument>/p:CustomZstdPrefix=${zstd.prefix}</argument>
+                    <argument>/p:CustomZstdLib=${zstd.lib}</argument>
+                    <argument>/p:CustomZstdInclude=${zstd.include}</argument>
+                    <argument>/p:RequireZstd=${require.ztsd}</argument>
                     <argument>/p:CustomOpensslPrefix=${openssl.prefix}</argument>
                     <argument>/p:CustomOpensslLib=${openssl.lib}</argument>
                     <argument>/p:CustomOpensslInclude=${openssl.include}</argument>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/hadoop-common-project/hadoop-common/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/CMakeLists.txt b/hadoop-common-project/hadoop-common/src/CMakeLists.txt
index c93bfe7..3e52643 100644
--- a/hadoop-common-project/hadoop-common/src/CMakeLists.txt
+++ b/hadoop-common-project/hadoop-common/src/CMakeLists.txt
@@ -94,6 +94,31 @@ else()
     endif()
 endif()
 
+# Require Zstandard
+SET(STORED_CMAKE_FIND_LIBRARY_SUFFIXES ${CMAKE_FIND_LIBRARY_SUFFIXES})
+hadoop_set_find_shared_library_version("1")
+find_library(ZSTD_LIBRARY
+        NAMES zstd
+        PATHS ${CUSTOM_ZSTD_PREFIX} ${CUSTOM_ZSTD_PREFIX}/lib
+        ${CUSTOM_ZSTD_PREFIX}/lib64 ${CUSTOM_ZSTD_LIB})
+SET(CMAKE_FIND_LIBRARY_SUFFIXES ${STORED_CMAKE_FIND_LIBRARY_SUFFIXES})
+find_path(ZSTD_INCLUDE_DIR
+        NAMES zstd.h
+        PATHS ${CUSTOM_ZSTD_PREFIX} ${CUSTOM_ZSTD_PREFIX}/include
+        ${CUSTOM_ZSTD_INCLUDE})
+if (ZSTD_LIBRARY AND ZSTD_INCLUDE_DIR)
+    GET_FILENAME_COMPONENT(HADOOP_ZSTD_LIBRARY ${ZSTD_LIBRARY} NAME)
+    set(ZSTD_SOURCE_FILES
+            "${SRC}/io/compress/zstd/ZStandardCompressor.c"
+            "${SRC}/io/compress/zstd/ZStandardDecompressor.c")
+else (ZSTD_LIBRARY AND ZSTD_INCLUDE_DIR)
+    set(ZSTD_INCLUDE_DIR "")
+    set(ZSTD_SOURCE_FILES "")
+    IF(REQUIRE_ZSTD)
+        MESSAGE(FATAL_ERROR "Required zstandard library could not be found.  ZSTD_LIBRARY=${ZSTD_LIBRARY}, ZSTD_INCLUDE_DIR=${ZSTD_INCLUDE_DIR}, CUSTOM_ZSTD_INCLUDE_DIR=${CUSTOM_ZSTD_INCLUDE_DIR}, CUSTOM_ZSTD_PREFIX=${CUSTOM_ZSTD_PREFIX}, CUSTOM_ZSTD_INCLUDE=${CUSTOM_ZSTD_INCLUDE}")
+    ENDIF(REQUIRE_ZSTD)
+endif (ZSTD_LIBRARY AND ZSTD_INCLUDE_DIR)
+
 # Build hardware CRC32 acceleration, if supported on the platform.
 if(CMAKE_SYSTEM_PROCESSOR MATCHES "^i.86$" OR CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" OR CMAKE_SYSTEM_PROCESSOR STREQUAL "amd64")
   set(BULK_CRC_ARCH_SOURCE_FIlE "${SRC}/util/bulk_crc32_x86.c")
@@ -169,6 +194,7 @@ include_directories(
     ${ZLIB_INCLUDE_DIRS}
     ${BZIP2_INCLUDE_DIR}
     ${SNAPPY_INCLUDE_DIR}
+    ${ZSTD_INCLUDE_DIR}
     ${OPENSSL_INCLUDE_DIR}
     ${SRC}/util
 )
@@ -182,6 +208,7 @@ hadoop_add_dual_library(hadoop
     ${SRC}/io/compress/lz4/lz4.c
     ${SRC}/io/compress/lz4/lz4hc.c
     ${SNAPPY_SOURCE_FILES}
+    ${ZSTD_SOURCE_FILES}
     ${OPENSSL_SOURCE_FILES}
     ${SRC}/io/compress/zlib/ZlibCompressor.c
     ${SRC}/io/compress/zlib/ZlibDecompressor.c

http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/hadoop-common-project/hadoop-common/src/config.h.cmake
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/config.h.cmake b/hadoop-common-project/hadoop-common/src/config.h.cmake
index d71271d..f2e1586 100644
--- a/hadoop-common-project/hadoop-common/src/config.h.cmake
+++ b/hadoop-common-project/hadoop-common/src/config.h.cmake
@@ -21,6 +21,7 @@
 #cmakedefine HADOOP_ZLIB_LIBRARY "@HADOOP_ZLIB_LIBRARY@"
 #cmakedefine HADOOP_BZIP2_LIBRARY "@HADOOP_BZIP2_LIBRARY@"
 #cmakedefine HADOOP_SNAPPY_LIBRARY "@HADOOP_SNAPPY_LIBRARY@"
+#cmakedefine HADOOP_ZSTD_LIBRARY "@HADOOP_ZSTD_LIBRARY@"
 #cmakedefine HADOOP_OPENSSL_LIBRARY "@HADOOP_OPENSSL_LIBRARY@"
 #cmakedefine HAVE_SYNC_FILE_RANGE
 #cmakedefine HAVE_POSIX_FADVISE

http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
index ac634ab..49062ea 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
@@ -141,6 +141,22 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
   public static final int IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT =
       256 * 1024;
 
+  /** ZStandard compression level. */
+  public static final String IO_COMPRESSION_CODEC_ZSTD_LEVEL_KEY =
+      "io.compression.codec.zstd.level";
+
+  /** Default value for IO_COMPRESSION_CODEC_ZSTD_LEVEL_KEY. */
+  public static final int IO_COMPRESSION_CODEC_ZSTD_LEVEL_DEFAULT = 3;
+
+  /** ZStandard buffer size. */
+  public static final String IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY =
+      "io.compression.codec.zstd.buffersize";
+
+  /** ZStandard buffer size a value of 0 means use the recommended zstd
+   * buffer size that the library recommends. */
+  public static final int
+      IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT = 0;
+
   /** Internal buffer size for Lz4 compressor/decompressors */
   public static final String IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY =
       "io.compression.codec.lz4.buffersize";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Decompressor.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Decompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Decompressor.java
index 8cb0b2a..3808003 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Decompressor.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Decompressor.java
@@ -95,7 +95,7 @@ public interface Decompressor {
    * @param b Buffer for the compressed data
    * @param off Start offset of the data
    * @param len Size of the buffer
-   * @return The actual number of bytes of compressed data.
+   * @return The actual number of bytes of uncompressed data.
    * @throws IOException
    */
   public int decompress(byte[] b, int off, int len) throws IOException;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/ZStandardCodec.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/ZStandardCodec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/ZStandardCodec.java
new file mode 100644
index 0000000..11e98a1
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/ZStandardCodec.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.io.compress.zstd.ZStandardCompressor;
+import org.apache.hadoop.io.compress.zstd.ZStandardDecompressor;
+import org.apache.hadoop.util.NativeCodeLoader;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY;
+
+/**
+ * This class creates zstd compressors/decompressors.
+ */
+public class ZStandardCodec implements
+    Configurable, CompressionCodec, DirectDecompressionCodec  {
+  private Configuration conf;
+
+  /**
+   * Set the configuration to be used by this object.
+   *
+   * @param conf the configuration object.
+   */
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  /**
+   * Return the configuration used by this object.
+   *
+   * @return the configuration object used by this object.
+   */
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  public static void checkNativeCodeLoaded() {
+    if (!NativeCodeLoader.isNativeCodeLoaded() ||
+        !NativeCodeLoader.buildSupportsZstd()) {
+      throw new RuntimeException("native zStandard library "
+          + "not available: this version of libhadoop was built "
+          + "without zstd support.");
+    }
+    if (!ZStandardCompressor.isNativeCodeLoaded()) {
+      throw new RuntimeException("native zStandard library not "
+          + "available: ZStandardCompressor has not been loaded.");
+    }
+    if (!ZStandardDecompressor.isNativeCodeLoaded()) {
+      throw new RuntimeException("native zStandard library not "
+          + "available: ZStandardDecompressor has not been loaded.");
+    }
+  }
+
+  public static boolean isNativeCodeLoaded() {
+    return ZStandardCompressor.isNativeCodeLoaded()
+        && ZStandardDecompressor.isNativeCodeLoaded();
+  }
+
+  public static String getLibraryName() {
+    return ZStandardCompressor.getLibraryName();
+  }
+
+  public static int getCompressionLevel(Configuration conf) {
+    return conf.getInt(
+        CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_LEVEL_KEY,
+        CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_LEVEL_DEFAULT);
+  }
+
+  public static int getCompressionBufferSize(Configuration conf) {
+    int bufferSize = getBufferSize(conf);
+    return bufferSize == 0 ?
+        ZStandardCompressor.getRecommendedBufferSize() :
+        bufferSize;
+  }
+
+  public static int getDecompressionBufferSize(Configuration conf) {
+    int bufferSize = getBufferSize(conf);
+    return bufferSize == 0 ?
+        ZStandardDecompressor.getRecommendedBufferSize() :
+        bufferSize;
+  }
+
+  private static int getBufferSize(Configuration conf) {
+    return conf.getInt(IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY,
+        IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT);
+  }
+
+  /**
+   * Create a {@link CompressionOutputStream} that will write to the given
+   * {@link OutputStream}.
+   *
+   * @param out the location for the final output stream
+   * @return a stream the user can write uncompressed data to have compressed
+   * @throws IOException
+   */
+  @Override
+  public CompressionOutputStream createOutputStream(OutputStream out)
+      throws IOException {
+    return Util.
+        createOutputStreamWithCodecPool(this, conf, out);
+  }
+
+  /**
+   * Create a {@link CompressionOutputStream} that will write to the given
+   * {@link OutputStream} with the given {@link Compressor}.
+   *
+   * @param out        the location for the final output stream
+   * @param compressor compressor to use
+   * @return a stream the user can write uncompressed data to have compressed
+   * @throws IOException
+   */
+  @Override
+  public CompressionOutputStream createOutputStream(OutputStream out,
+      Compressor compressor)
+      throws IOException {
+    checkNativeCodeLoaded();
+    return new CompressorStream(out, compressor,
+        getCompressionBufferSize(conf));
+  }
+
+  /**
+   * Get the type of {@link Compressor} needed by this {@link CompressionCodec}.
+   *
+   * @return the type of compressor needed by this codec.
+   */
+  @Override
+  public Class<? extends Compressor> getCompressorType() {
+    checkNativeCodeLoaded();
+    return ZStandardCompressor.class;
+  }
+
+  /**
+   * Create a new {@link Compressor} for use by this {@link CompressionCodec}.
+   *
+   * @return a new compressor for use by this codec
+   */
+  @Override
+  public Compressor createCompressor() {
+    checkNativeCodeLoaded();
+    return new ZStandardCompressor(
+        getCompressionLevel(conf), getCompressionBufferSize(conf));
+  }
+
+
+  /**
+   * Create a {@link CompressionInputStream} that will read from the given
+   * input stream.
+   *
+   * @param in the stream to read compressed bytes from
+   * @return a stream to read uncompressed bytes from
+   * @throws IOException
+   */
+  @Override
+  public CompressionInputStream createInputStream(InputStream in)
+      throws IOException {
+    return Util.
+        createInputStreamWithCodecPool(this, conf, in);
+  }
+
+  /**
+   * Create a {@link CompressionInputStream} that will read from the given
+   * {@link InputStream} with the given {@link Decompressor}.
+   *
+   * @param in           the stream to read compressed bytes from
+   * @param decompressor decompressor to use
+   * @return a stream to read uncompressed bytes from
+   * @throws IOException
+   */
+  @Override
+  public CompressionInputStream createInputStream(InputStream in,
+      Decompressor decompressor)
+      throws IOException {
+    checkNativeCodeLoaded();
+    return new DecompressorStream(in, decompressor,
+        getDecompressionBufferSize(conf));
+  }
+
+  /**
+   * Get the type of {@link Decompressor} needed by
+   * this {@link CompressionCodec}.
+   *
+   * @return the type of decompressor needed by this codec.
+   */
+  @Override
+  public Class<? extends Decompressor> getDecompressorType() {
+    checkNativeCodeLoaded();
+    return ZStandardDecompressor.class;
+  }
+
+  /**
+   * Create a new {@link Decompressor} for use by this {@link CompressionCodec}.
+   *
+   * @return a new decompressor for use by this codec
+   */
+  @Override
+  public Decompressor createDecompressor() {
+    checkNativeCodeLoaded();
+    return new ZStandardDecompressor(getDecompressionBufferSize(conf));
+  }
+
+  /**
+   * Get the default filename extension for this kind of compression.
+   *
+   * @return <code>.zst</code>.
+   */
+  @Override
+  public String getDefaultExtension() {
+    return ".zst";
+  }
+
+  @Override
+  public DirectDecompressor createDirectDecompressor() {
+    return new ZStandardDecompressor.ZStandardDirectDecompressor(
+        getDecompressionBufferSize(conf)
+    );
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.java
new file mode 100644
index 0000000..eb2121a
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress.zstd;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.ZStandardCodec;
+import org.apache.hadoop.util.NativeCodeLoader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * A {@link Compressor} based on the zStandard compression algorithm.
+ * https://github.com/facebook/zstd
+ */
+public class ZStandardCompressor implements Compressor {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ZStandardCompressor.class);
+
+  private long stream;
+  private int level;
+  private int directBufferSize;
+  private byte[] userBuf = null;
+  private int userBufOff = 0, userBufLen = 0;
+  private ByteBuffer uncompressedDirectBuf = null;
+  private int uncompressedDirectBufOff = 0, uncompressedDirectBufLen = 0;
+  private boolean keepUncompressedBuf = false;
+  private ByteBuffer compressedDirectBuf = null;
+  private boolean finish, finished;
+  private long bytesRead = 0;
+  private long bytesWritten = 0;
+
+  private static boolean nativeZStandardLoaded = false;
+
+  static {
+    if (NativeCodeLoader.isNativeCodeLoaded()) {
+      try {
+        // Initialize the native library
+        initIDs();
+        nativeZStandardLoaded = true;
+      } catch (Throwable t) {
+        LOG.warn("Error loading zstandard native libraries: " + t);
+      }
+    }
+  }
+
+  public static boolean isNativeCodeLoaded() {
+    return nativeZStandardLoaded;
+  }
+
+  public static int getRecommendedBufferSize() {
+    return getStreamSize();
+  }
+
+  @VisibleForTesting
+  ZStandardCompressor() {
+    this(CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_LEVEL_DEFAULT,
+        CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
+  }
+
+  /**
+   * Creates a new compressor with the default compression level.
+   * Compressed data will be generated in ZStandard format.
+   */
+  public ZStandardCompressor(int level, int bufferSize) {
+    this(level, bufferSize, bufferSize);
+  }
+
+  @VisibleForTesting
+  ZStandardCompressor(int level, int inputBufferSize, int outputBufferSize) {
+    this.level = level;
+    stream = create();
+    this.directBufferSize = outputBufferSize;
+    uncompressedDirectBuf = ByteBuffer.allocateDirect(inputBufferSize);
+    compressedDirectBuf = ByteBuffer.allocateDirect(outputBufferSize);
+    compressedDirectBuf.position(outputBufferSize);
+    reset();
+  }
+
+  /**
+   * Prepare the compressor to be used in a new stream with settings defined in
+   * the given Configuration. It will reset the compressor's compression level
+   * and compression strategy.
+   *
+   * @param conf Configuration storing new settings
+   */
+  @Override
+  public void reinit(Configuration conf) {
+    if (conf == null) {
+      return;
+    }
+    level = ZStandardCodec.getCompressionLevel(conf);
+    reset();
+    LOG.debug("Reinit compressor with new compression configuration");
+  }
+
+  @Override
+  public void setInput(byte[] b, int off, int len) {
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if (off < 0 || len < 0 || off > b.length - len) {
+      throw new ArrayIndexOutOfBoundsException();
+    }
+
+    this.userBuf = b;
+    this.userBufOff = off;
+    this.userBufLen = len;
+    uncompressedDirectBufOff = 0;
+    setInputFromSavedData();
+
+    compressedDirectBuf.limit(directBufferSize);
+    compressedDirectBuf.position(directBufferSize);
+  }
+
+  //copy enough data from userBuf to uncompressedDirectBuf
+  private void setInputFromSavedData() {
+    int len = Math.min(userBufLen, uncompressedDirectBuf.remaining());
+    uncompressedDirectBuf.put(userBuf, userBufOff, len);
+    userBufLen -= len;
+    userBufOff += len;
+    uncompressedDirectBufLen = uncompressedDirectBuf.position();
+  }
+
+  @Override
+  public void setDictionary(byte[] b, int off, int len) {
+    throw new UnsupportedOperationException(
+        "Dictionary support is not enabled");
+  }
+
+  @Override
+  public boolean needsInput() {
+    // Consume remaining compressed data?
+    if (compressedDirectBuf.remaining() > 0) {
+      return false;
+    }
+
+    // have we consumed all input
+    if (keepUncompressedBuf && uncompressedDirectBufLen > 0) {
+      return false;
+    }
+
+    if (uncompressedDirectBuf.remaining() > 0) {
+      // Check if we have consumed all user-input
+      if (userBufLen <= 0) {
+        return true;
+      } else {
+        // copy enough data from userBuf to uncompressedDirectBuf
+        setInputFromSavedData();
+        // uncompressedDirectBuf is not full
+        return uncompressedDirectBuf.remaining() > 0;
+      }
+    }
+
+    return false;
+  }
+
+  @Override
+  public void finish() {
+    finish = true;
+  }
+
+  @Override
+  public boolean finished() {
+    // Check if 'zstd' says its 'finished' and all compressed
+    // data has been consumed
+    return (finished && compressedDirectBuf.remaining() == 0);
+  }
+
+  @Override
+  public int compress(byte[] b, int off, int len) throws IOException {
+    checkStream();
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if (off < 0 || len < 0 || off > b.length - len) {
+      throw new ArrayIndexOutOfBoundsException();
+    }
+
+    // Check if there is compressed data
+    int n = compressedDirectBuf.remaining();
+    if (n > 0) {
+      n = Math.min(n, len);
+      compressedDirectBuf.get(b, off, n);
+      return n;
+    }
+
+    // Re-initialize the output direct buffer
+    compressedDirectBuf.rewind();
+    compressedDirectBuf.limit(directBufferSize);
+
+    // Compress data
+    n = deflateBytesDirect(
+        uncompressedDirectBuf,
+        uncompressedDirectBufOff,
+        uncompressedDirectBufLen,
+        compressedDirectBuf,
+        directBufferSize
+    );
+    compressedDirectBuf.limit(n);
+
+    // Check if we have consumed all input buffer
+    if (uncompressedDirectBufLen <= 0) {
+      // consumed all input buffer
+      keepUncompressedBuf = false;
+      uncompressedDirectBuf.clear();
+      uncompressedDirectBufOff = 0;
+      uncompressedDirectBufLen = 0;
+    } else {
+      //  did not consume all input buffer
+      keepUncompressedBuf = true;
+    }
+
+    // Get at most 'len' bytes
+    n = Math.min(n, len);
+    compressedDirectBuf.get(b, off, n);
+    return n;
+  }
+
+  /**
+   * Returns the total number of compressed bytes output so far.
+   *
+   * @return the total (non-negative) number of compressed bytes output so far
+   */
+  @Override
+  public long getBytesWritten() {
+    checkStream();
+    return bytesWritten;
+  }
+
+  /**
+   * <p>Returns the total number of uncompressed bytes input so far.</p>
+   *
+   * @return the total (non-negative) number of uncompressed bytes input so far
+   */
+  @Override
+  public long getBytesRead() {
+    checkStream();
+    return bytesRead;
+  }
+
+  @Override
+  public void reset() {
+    checkStream();
+    init(level, stream);
+    finish = false;
+    finished = false;
+    bytesRead = 0;
+    bytesWritten = 0;
+    uncompressedDirectBuf.rewind();
+    uncompressedDirectBufOff = 0;
+    uncompressedDirectBufLen = 0;
+    keepUncompressedBuf = false;
+    compressedDirectBuf.limit(directBufferSize);
+    compressedDirectBuf.position(directBufferSize);
+    userBufOff = 0;
+    userBufLen = 0;
+  }
+
+  @Override
+  public void end() {
+    if (stream != 0) {
+      end(stream);
+      stream = 0;
+    }
+  }
+
+  private void checkStream() {
+    if (stream == 0) {
+      throw new NullPointerException();
+    }
+  }
+
+  private native static long create();
+  private native static void init(int level, long stream);
+  private native int deflateBytesDirect(ByteBuffer src, int srcOffset,
+      int srcLen, ByteBuffer dst, int dstLen);
+  private static native int getStreamSize();
+  private native static void end(long strm);
+  private native static void initIDs();
+  public native static String getLibraryName();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.java
new file mode 100644
index 0000000..73d73e1
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress.zstd;
+
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DirectDecompressor;
+import org.apache.hadoop.util.NativeCodeLoader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * A {@link Decompressor} based on the zStandard compression algorithm.
+ * https://github.com/facebook/zstd
+ */
+public class ZStandardDecompressor implements Decompressor {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ZStandardDecompressor.class);
+
+  private long stream;
+  private int directBufferSize;
+  private ByteBuffer compressedDirectBuf = null;
+  private int compressedDirectBufOff, bytesInCompressedBuffer;
+  private ByteBuffer uncompressedDirectBuf = null;
+  private byte[] userBuf = null;
+  private int userBufOff = 0, userBufferBytesToConsume = 0;
+  private boolean finished;
+  private int remaining = 0;
+
+  private static boolean nativeZStandardLoaded = false;
+
+  static {
+    if (NativeCodeLoader.isNativeCodeLoaded()) {
+      try {
+        // Initialize the native library
+        initIDs();
+        nativeZStandardLoaded = true;
+      } catch (Throwable t) {
+        LOG.warn("Error loading zstandard native libraries: " + t);
+      }
+    }
+  }
+
+  public static boolean isNativeCodeLoaded() {
+    return nativeZStandardLoaded;
+  }
+
+  public static int getRecommendedBufferSize() {
+    return getStreamSize();
+  }
+
+  public ZStandardDecompressor() {
+    this(getStreamSize());
+  }
+
+  /**
+   * Creates a new decompressor.
+   */
+  public ZStandardDecompressor(int bufferSize) {
+    this.directBufferSize = bufferSize;
+    compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
+    uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
+    uncompressedDirectBuf.position(directBufferSize);
+    stream = create();
+    reset();
+  }
+
+  @Override
+  public void setInput(byte[] b, int off, int len) {
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if (off < 0 || len < 0 || off > b.length - len) {
+      throw new ArrayIndexOutOfBoundsException();
+    }
+
+    this.userBuf = b;
+    this.userBufOff = off;
+    this.userBufferBytesToConsume = len;
+
+    setInputFromSavedData();
+
+    uncompressedDirectBuf.limit(directBufferSize);
+    uncompressedDirectBuf.position(directBufferSize);
+  }
+
+  private void setInputFromSavedData() {
+    compressedDirectBufOff = 0;
+    bytesInCompressedBuffer = userBufferBytesToConsume;
+    if (bytesInCompressedBuffer > directBufferSize) {
+      bytesInCompressedBuffer = directBufferSize;
+    }
+
+    compressedDirectBuf.rewind();
+    compressedDirectBuf.put(
+        userBuf, userBufOff, bytesInCompressedBuffer);
+
+    userBufOff += bytesInCompressedBuffer;
+    userBufferBytesToConsume -= bytesInCompressedBuffer;
+  }
+
+  // dictionary is not supported
+  @Override
+  public void setDictionary(byte[] b, int off, int len) {
+    throw new UnsupportedOperationException(
+        "Dictionary support is not enabled");
+  }
+
+  @Override
+  public boolean needsInput() {
+    // Consume remaining compressed data?
+    if (uncompressedDirectBuf.remaining() > 0) {
+      return false;
+    }
+
+    // Check if we have consumed all input
+    if (bytesInCompressedBuffer - compressedDirectBufOff <= 0) {
+      // Check if we have consumed all user-input
+      if (userBufferBytesToConsume <= 0) {
+        return true;
+      } else {
+        setInputFromSavedData();
+      }
+    }
+    return false;
+  }
+
+  // dictionary is not supported.
+  @Override
+  public boolean needsDictionary() {
+    return false;
+  }
+
+  @Override
+  public boolean finished() {
+    // finished == true if ZSTD_decompressStream() returns 0
+    // also check we have nothing left in our buffer
+    return (finished && uncompressedDirectBuf.remaining() == 0);
+  }
+
+  @Override
+  public int decompress(byte[] b, int off, int len)
+      throws IOException {
+    checkStream();
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if (off < 0 || len < 0 || off > b.length - len) {
+      throw new ArrayIndexOutOfBoundsException();
+    }
+
+    // Check if there is uncompressed data
+    int n = uncompressedDirectBuf.remaining();
+    if (n > 0) {
+      return populateUncompressedBuffer(b, off, len, n);
+    }
+
+    // Re-initialize the output direct buffer
+    uncompressedDirectBuf.rewind();
+    uncompressedDirectBuf.limit(directBufferSize);
+
+    // Decompress data
+    n = inflateBytesDirect(
+        compressedDirectBuf,
+        compressedDirectBufOff,
+        bytesInCompressedBuffer,
+        uncompressedDirectBuf,
+        0,
+        directBufferSize
+    );
+    uncompressedDirectBuf.limit(n);
+
+    // Get at most 'len' bytes
+    return populateUncompressedBuffer(b, off, len, n);
+  }
+
+  /**
+   * <p>Returns the number of bytes remaining in the input buffers;
+   * normally called when finished() is true to determine amount of post-stream
+   * data.</p>
+   *
+   * @return the total (non-negative) number of unprocessed bytes in input
+   */
+  @Override
+  public int getRemaining() {
+    checkStream();
+    // userBuf + compressedDirectBuf
+    return userBufferBytesToConsume + remaining;
+  }
+
+  /**
+   * Resets everything including the input buffers (user and direct).
+   */
+  @Override
+  public void reset() {
+    checkStream();
+    init(stream);
+    remaining = 0;
+    finished = false;
+    compressedDirectBufOff = 0;
+    bytesInCompressedBuffer = 0;
+    uncompressedDirectBuf.limit(directBufferSize);
+    uncompressedDirectBuf.position(directBufferSize);
+    userBufOff = 0;
+    userBufferBytesToConsume = 0;
+  }
+
+  @Override
+  public void end() {
+    if (stream != 0) {
+      free(stream);
+      stream = 0;
+    }
+  }
+
+  @Override
+  protected void finalize() {
+    reset();
+  }
+
+  private void checkStream() {
+    if (stream == 0) {
+      throw new NullPointerException("Stream not initialized");
+    }
+  }
+
+  private int populateUncompressedBuffer(byte[] b, int off, int len, int n) {
+    n = Math.min(n, len);
+    uncompressedDirectBuf.get(b, off, n);
+    return n;
+  }
+
+  private native static void initIDs();
+  private native static long create();
+  private native static void init(long stream);
+  private native int inflateBytesDirect(ByteBuffer src, int srcOffset,
+      int srcLen, ByteBuffer dst, int dstOffset, int dstLen);
+  private native static void free(long strm);
+  private native static int getStreamSize();
+
+  int inflateDirect(ByteBuffer src, ByteBuffer dst) throws IOException {
+    assert
+        (this instanceof ZStandardDecompressor.ZStandardDirectDecompressor);
+
+    int originalPosition = dst.position();
+    int n = inflateBytesDirect(
+        src, src.position(), src.remaining(), dst, dst.position(),
+        dst.remaining()
+    );
+    dst.position(originalPosition + n);
+    if (bytesInCompressedBuffer > 0) {
+      src.position(compressedDirectBufOff);
+    } else {
+      src.position(src.limit());
+    }
+    return n;
+  }
+
+  /**
+   * A {@link DirectDecompressor} for ZStandard
+   * https://github.com/facebook/zstd.
+   */
+  public static class ZStandardDirectDecompressor
+      extends ZStandardDecompressor implements DirectDecompressor {
+
+    public ZStandardDirectDecompressor(int directBufferSize) {
+      super(directBufferSize);
+    }
+
+    @Override
+    public boolean finished() {
+      return (endOfInput && super.finished());
+    }
+
+    @Override
+    public void reset() {
+      super.reset();
+      endOfInput = true;
+    }
+
+    private boolean endOfInput;
+
+    @Override
+    public void decompress(ByteBuffer src, ByteBuffer dst)
+        throws IOException {
+      assert dst.isDirect() : "dst.isDirect()";
+      assert src.isDirect() : "src.isDirect()";
+      assert dst.remaining() > 0 : "dst.remaining() > 0";
+      this.inflateDirect(src, dst);
+      endOfInput = !src.hasRemaining();
+    }
+
+    @Override
+    public void setDictionary(byte[] b, int off, int len) {
+      throw new UnsupportedOperationException(
+          "byte[] arrays are not supported for DirectDecompressor");
+    }
+
+    @Override
+    public int decompress(byte[] b, int off, int len) {
+      throw new UnsupportedOperationException(
+          "byte[] arrays are not supported for DirectDecompressor");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/package-info.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/package-info.java
new file mode 100644
index 0000000..9069070
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.io.compress.zstd;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeCodeLoader.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeCodeLoader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeCodeLoader.java
index 533fc07..ff5803c 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeCodeLoader.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeCodeLoader.java
@@ -80,6 +80,11 @@ public class NativeCodeLoader {
   public static native boolean buildSupportsSnappy();
   
   /**
+   * Returns true only if this build was compiled with support for ZStandard.
+   */
+  public static native boolean buildSupportsZstd();
+
+  /**
    * Returns true only if this build was compiled with support for openssl.
    */
   public static native boolean buildSupportsOpenssl();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeLibraryChecker.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeLibraryChecker.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeLibraryChecker.java
index d8c6899..c3ffe58 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeLibraryChecker.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeLibraryChecker.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.OpensslCipher;
 import org.apache.hadoop.io.compress.Lz4Codec;
 import org.apache.hadoop.io.compress.SnappyCodec;
+import org.apache.hadoop.io.compress.ZStandardCodec;
 import org.apache.hadoop.io.compress.bzip2.Bzip2Factory;
 import org.apache.hadoop.io.compress.zlib.ZlibFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -65,6 +66,7 @@ public class NativeLibraryChecker {
     boolean nativeHadoopLoaded = NativeCodeLoader.isNativeCodeLoaded();
     boolean zlibLoaded = false;
     boolean snappyLoaded = false;
+    boolean zStdLoaded = false;
     // lz4 is linked within libhadoop
     boolean lz4Loaded = nativeHadoopLoaded;
     boolean bzip2Loaded = Bzip2Factory.isNativeBzip2Loaded(conf);
@@ -75,6 +77,7 @@ public class NativeLibraryChecker {
     String hadoopLibraryName = "";
     String zlibLibraryName = "";
     String snappyLibraryName = "";
+    String zstdLibraryName = "";
     String lz4LibraryName = "";
     String bzip2LibraryName = "";
     String winutilsPath = null;
@@ -90,6 +93,11 @@ public class NativeLibraryChecker {
       if (snappyLoaded && NativeCodeLoader.buildSupportsSnappy()) {
         snappyLibraryName = SnappyCodec.getLibraryName();
       }
+      zStdLoaded = NativeCodeLoader.buildSupportsZstd() &&
+          ZStandardCodec.isNativeCodeLoaded();
+      if (zStdLoaded && NativeCodeLoader.buildSupportsZstd()) {
+        zstdLibraryName = ZStandardCodec.getLibraryName();
+      }
       if (OpensslCipher.getLoadingFailureReason() != null) {
         openSslDetail = OpensslCipher.getLoadingFailureReason();
         openSslLoaded = false;
@@ -122,6 +130,7 @@ public class NativeLibraryChecker {
     System.out.printf("hadoop:  %b %s%n", nativeHadoopLoaded, hadoopLibraryName);
     System.out.printf("zlib:    %b %s%n", zlibLoaded, zlibLibraryName);
     System.out.printf("snappy:  %b %s%n", snappyLoaded, snappyLibraryName);
+    System.out.printf("zstd  :  %b %s%n", zStdLoaded, zstdLibraryName);
     System.out.printf("lz4:     %b %s%n", lz4Loaded, lz4LibraryName);
     System.out.printf("bzip2:   %b %s%n", bzip2Loaded, bzip2LibraryName);
     System.out.printf("openssl: %b %s%n", openSslLoaded, openSslDetail);
@@ -130,7 +139,8 @@ public class NativeLibraryChecker {
     }
 
     if ((!nativeHadoopLoaded) || (Shell.WINDOWS && (!winutilsExists)) ||
-        (checkAll && !(zlibLoaded && snappyLoaded && lz4Loaded && bzip2Loaded))) {
+        (checkAll && !(zlibLoaded && snappyLoaded && lz4Loaded && bzip2Loaded
+        && zStdLoaded))) {
       // return 1 to indicated check failed
       ExitUtil.terminate(1);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.c
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.c
new file mode 100644
index 0000000..04f2a3e
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.c
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#include "org_apache_hadoop_io_compress_zstd.h"
+
+#if defined HADOOP_ZSTD_LIBRARY
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#ifdef UNIX
+#include <dlfcn.h>
+#include "config.h"
+#endif
+
+#include "org_apache_hadoop_io_compress_zstd_ZStandardCompressor.h"
+
+static jfieldID ZStandardCompressor_stream;
+static jfieldID ZStandardCompressor_uncompressedDirectBufOff;
+static jfieldID ZStandardCompressor_uncompressedDirectBufLen;
+static jfieldID ZStandardCompressor_directBufferSize;
+static jfieldID ZStandardCompressor_finish;
+static jfieldID ZStandardCompressor_finished;
+static jfieldID ZStandardCompressor_bytesWritten;
+static jfieldID ZStandardCompressor_bytesRead;
+
+#ifdef UNIX
+static size_t (*dlsym_ZSTD_CStreamInSize)(void);
+static size_t (*dlsym_ZSTD_CStreamOutSize)(void);
+static ZSTD_CStream* (*dlsym_ZSTD_createCStream)(void);
+static size_t (*dlsym_ZSTD_initCStream)(ZSTD_CStream*, int);
+static size_t (*dlsym_ZSTD_freeCStream)(ZSTD_CStream*);
+static size_t (*dlsym_ZSTD_compressStream)(ZSTD_CStream*, ZSTD_outBuffer*, ZSTD_inBuffer*);
+static size_t (*dlsym_ZSTD_endStream)(ZSTD_CStream*, ZSTD_outBuffer*);
+static size_t (*dlsym_ZSTD_flushStream)(ZSTD_CStream*, ZSTD_outBuffer*);
+static unsigned (*dlsym_ZSTD_isError)(size_t);
+static const char * (*dlsym_ZSTD_getErrorName)(size_t);
+#endif
+
+#ifdef WINDOWS
+typedef size_t (__cdecl *__dlsym_ZSTD_CStreamInSize)(void);
+typedef size_t (__cdecl *__dlsym_ZSTD_CStreamOutSize)(void);
+typedef ZSTD_CStream* (__cdecl *__dlsym_ZSTD_createCStream)(void);
+typedef size_t (__cdecl *__dlsym_ZSTD_initCStream)(ZSTD_CStream*, int);
+typedef size_t (__cdecl *__dlsym_ZSTD_freeCStream)(ZSTD_CStream*);
+typedef size_t (__cdecl *__dlsym_ZSTD_compressStream)(ZSTD_CStream*, ZSTD_outBuffer*, ZSTD_inBuffer*);
+typedef size_t (__cdecl *__dlsym_ZSTD_endStream)(ZSTD_CStream*, ZSTD_outBuffer*);
+typedef size_t (__cdecl *__dlsym_ZSTD_flushStream)(ZSTD_CStream*, ZSTD_outBuffer*);
+typedef unsigned (__cdecl *__dlsym_ZSTD_isError)(size_t);
+typedef const char * (__cdecl *__dlsym_ZSTD_getErrorName)(size_t);
+
+static __dlsym_ZSTD_CStreamInSize dlsym_ZSTD_CStreamInSize;
+static __dlsym_ZSTD_CStreamOutSize dlsym_ZSTD_CStreamOutSize;
+static __dlsym_ZSTD_createCStream dlsym_ZSTD_createCStream;
+static __dlsym_ZSTD_initCStream dlsym_ZSTD_initCStream;
+static __dlsym_ZSTD_freeCStream dlsym_ZSTD_freeCStream;
+static __dlsym_ZSTD_compressStream dlsym_ZSTD_compressStream;
+static __dlsym_ZSTD_endStream dlsym_ZSTD_endStream;
+static __dlsym_ZSTD_flushStream dlsym_ZSTD_flushStream;
+static __dlsym_ZSTD_isError dlsym_ZSTD_isError;
+static __dlsym_ZSTD_getErrorName dlsym_ZSTD_getErrorName;
+#endif
+
+// Load the libztsd.so from disk
+JNIEXPORT void JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardCompressor_initIDs (JNIEnv *env, jclass clazz) {
+#ifdef UNIX
+    // Load libzstd.so
+    void *libzstd = dlopen(HADOOP_ZSTD_LIBRARY, RTLD_LAZY | RTLD_GLOBAL);
+    if (!libzstd) {
+        char* msg = (char*)malloc(10000);
+        snprintf(msg, 10000, "%s (%s)!", "Cannot load " HADOOP_ZSTD_LIBRARY, dlerror());
+        THROW(env, "java/lang/InternalError", msg);
+        return;
+    }
+#endif
+
+#ifdef WINDOWS
+    HMODULE libzstd = LoadLibrary(HADOOP_ZSTD_LIBRARY);
+    if (!libzstd) {
+        THROW(env, "java/lang/UnsatisfiedLinkError", "Cannot load zstd.dll");
+        return;
+    }
+#endif
+
+#ifdef UNIX
+    // load dynamic symbols
+    dlerror();
+    LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_CStreamInSize, env, libzstd, "ZSTD_CStreamInSize");
+    LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_CStreamOutSize, env, libzstd, "ZSTD_CStreamOutSize");
+    LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_createCStream, env, libzstd, "ZSTD_createCStream");
+    LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_initCStream, env, libzstd, "ZSTD_initCStream");
+    LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_freeCStream, env, libzstd, "ZSTD_freeCStream");
+    LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_compressStream, env, libzstd, "ZSTD_compressStream");
+    LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_endStream, env, libzstd, "ZSTD_endStream");
+    LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_flushStream, env, libzstd, "ZSTD_flushStream");
+    LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_isError, env, libzstd, "ZSTD_isError");
+    LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_getErrorName, env, libzstd, "ZSTD_getErrorName");
+#endif
+
+#ifdef WINDOWS
+    LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_CStreamInSize, dlsym_ZSTD_CStreamInSize, env, libzstd, "ZSTD_CStreamInSize");
+    LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_CStreamOutSize, dlsym_ZSTD_CStreamOutSize, env, libzstd, "ZSTD_CStreamOutSize");
+    LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_createCStream, dlsym_ZSTD_createCStream, env, libzstd, "ZSTD_createCStream");
+    LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_initCStream, dlsym_ZSTD_initCStream, env, libzstd, "ZSTD_initCStream");
+    LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_freeCStream, dlsym_ZSTD_freeCStream, env, libzstd, "ZSTD_freeCStream");
+    LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_compressStream, dlsym_ZSTD_compressStream, env, libzstd, "ZSTD_compressStream");
+    LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_endStream, dlsym_ZSTD_endStream, env, libzstd, "ZSTD_endStream");
+    LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_flushStream, dlsym_ZSTD_flushStream, env, libzstd, "ZSTD_flushStream");
+    LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_isError, dlsym_ZSTD_isError, env, libzstd, "ZSTD_isError");
+    LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_getErrorName, dlsym_ZSTD_getErrorName, env, libzstd, "ZSTD_getErrorName");
+#endif
+
+    // load fields
+    ZStandardCompressor_stream = (*env)->GetFieldID(env, clazz, "stream", "J");
+    ZStandardCompressor_finish = (*env)->GetFieldID(env, clazz, "finish", "Z");
+    ZStandardCompressor_finished = (*env)->GetFieldID(env, clazz, "finished", "Z");
+    ZStandardCompressor_uncompressedDirectBufOff = (*env)->GetFieldID(env, clazz, "uncompressedDirectBufOff", "I");
+    ZStandardCompressor_uncompressedDirectBufLen = (*env)->GetFieldID(env, clazz, "uncompressedDirectBufLen", "I");
+    ZStandardCompressor_directBufferSize = (*env)->GetFieldID(env, clazz, "directBufferSize", "I");
+    ZStandardCompressor_bytesRead = (*env)->GetFieldID(env, clazz, "bytesRead", "J");
+    ZStandardCompressor_bytesWritten = (*env)->GetFieldID(env, clazz, "bytesWritten", "J");
+}
+
+// Create the compression stream
+JNIEXPORT jlong JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardCompressor_create (JNIEnv *env, jobject this) {
+    ZSTD_CStream* const stream =  dlsym_ZSTD_createCStream();
+    if (stream == NULL) {
+        THROW(env, "java/lang/InternalError", "Error creating the stream");
+        return (jlong)0;
+    }
+    return (jlong) stream;
+}
+
+// Initialize the compression stream
+JNIEXPORT void JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardCompressor_init (JNIEnv *env, jobject this, jint level, jlong stream) {
+    size_t result = dlsym_ZSTD_initCStream((ZSTD_CStream *) stream, level);
+    if (dlsym_ZSTD_isError(result)) {
+        THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(result));
+        return;
+    }
+}
+
+// free the compression stream
+JNIEXPORT void JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardCompressor_end (JNIEnv *env, jobject this, jlong stream) {
+    size_t result = dlsym_ZSTD_freeCStream((ZSTD_CStream *) stream);
+    if (dlsym_ZSTD_isError(result)) {
+        THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(result));
+        return;
+    }
+}
+
+JNIEXPORT jint Java_org_apache_hadoop_io_compress_zstd_ZStandardCompressor_deflateBytesDirect
+(JNIEnv *env, jobject this, jobject uncompressed_direct_buf, jint uncompressed_direct_buf_off, jint uncompressed_direct_buf_len, jobject compressed_direct_buf, jint compressed_direct_buf_len ) {
+    ZSTD_CStream* const stream = (ZSTD_CStream*) (*env)->GetLongField(env, this, ZStandardCompressor_stream);
+    if (!stream) {
+        THROW(env, "java/lang/NullPointerException", NULL);
+        return (jint)0;
+    }
+
+    jlong bytes_read = (*env)->GetLongField(env, this, ZStandardCompressor_bytesRead);
+    jlong bytes_written = (*env)->GetLongField(env, this, ZStandardCompressor_bytesWritten);
+    jboolean finish = (*env)->GetBooleanField(env, this, ZStandardCompressor_finish);
+
+    // Get the input direct buffer
+    void * uncompressed_bytes = (*env)->GetDirectBufferAddress(env, uncompressed_direct_buf);
+    if (!uncompressed_bytes) {
+        THROW(env, "java/lang/InternalError", "Undefined memory address for uncompressedDirectBuf");
+        return (jint) 0;
+    }
+
+    // Get the output direct buffer
+    void * compressed_bytes = (*env)->GetDirectBufferAddress(env, compressed_direct_buf);
+    if (!compressed_bytes) {
+        THROW(env, "java/lang/InternalError", "Undefined memory address for compressedDirectBuf");
+        return (jint) 0;
+    }
+
+    ZSTD_inBuffer input = { uncompressed_bytes, uncompressed_direct_buf_len, uncompressed_direct_buf_off };
+    ZSTD_outBuffer output = { compressed_bytes, compressed_direct_buf_len, 0 };
+
+    size_t size = dlsym_ZSTD_compressStream(stream, &output, &input);
+    if (dlsym_ZSTD_isError(size)) {
+        THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(size));
+        return (jint) 0;
+    }
+    if (finish && input.pos == input.size) {
+        // end the stream, flush and  write the frame epilogue
+        size = dlsym_ZSTD_endStream(stream, &output);
+        if (!size) {
+            (*env)->SetBooleanField(env, this, ZStandardCompressor_finished, JNI_TRUE);
+        }
+    } else {
+        // need to flush the output buffer
+        // this also updates the output buffer position.
+        size = dlsym_ZSTD_flushStream(stream, &output);
+    }
+    if (dlsym_ZSTD_isError(size)) {
+        THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(size));
+        return (jint) 0;
+    }
+
+    bytes_read += input.pos;
+    bytes_written += output.pos;
+    (*env)->SetLongField(env, this, ZStandardCompressor_bytesRead, bytes_read);
+    (*env)->SetLongField(env, this, ZStandardCompressor_bytesWritten, bytes_written);
+
+    (*env)->SetIntField(env, this, ZStandardCompressor_uncompressedDirectBufOff, input.pos);
+    (*env)->SetIntField(env, this, ZStandardCompressor_uncompressedDirectBufLen, input.size - input.pos);
+    return (jint) output.pos;
+}
+
+JNIEXPORT jstring JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardCompressor_getLibraryName
+(JNIEnv *env, jclass class) {
+#ifdef UNIX
+    if (dlsym_ZSTD_isError) {
+        Dl_info dl_info;
+        if (dladdr( dlsym_ZSTD_isError, &dl_info)) {
+            return (*env)->NewStringUTF(env, dl_info.dli_fname);
+        }
+    }
+    return (*env)->NewStringUTF(env, HADOOP_ZSTD_LIBRARY);
+#endif
+#ifdef WINDOWS
+    LPWSTR filename = NULL;
+    GetLibraryName(dlsym_ZSTD_isError, &filename);
+    if (filename != NULL) {
+        return (*env)->NewString(env, filename, (jsize) wcslen(filename));
+    } else {
+        return (*env)->NewStringUTF(env, "Unavailable");
+    }
+#endif
+}
+
+// returns the max size of the recommended input and output buffers
+JNIEXPORT jint JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardCompressor_getStreamSize
+(JNIEnv *env, jobject this) {
+    int x = (int) dlsym_ZSTD_CStreamInSize();
+    int y = (int) dlsym_ZSTD_CStreamOutSize();
+    return (x >= y) ? x : y;
+}
+
+#endif //define HADOOP_ZSTD_LIBRARY
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.c
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.c
new file mode 100644
index 0000000..1236756
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.c
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "org_apache_hadoop_io_compress_zstd.h"
+
+#if defined HADOOP_ZSTD_LIBRARY
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#ifdef UNIX
+#include <dlfcn.h>
+#include "config.h"
+#endif
+
+#include "org_apache_hadoop_io_compress_zstd_ZStandardDecompressor.h"
+
+static jfieldID ZStandardDecompressor_stream;
+static jfieldID ZStandardDecompressor_compressedDirectBufOff;
+static jfieldID ZStandardDecompressor_bytesInCompressedBuffer;
+static jfieldID ZStandardDecompressor_directBufferSize;
+static jfieldID ZStandardDecompressor_finished;
+static jfieldID ZStandardDecompressor_remaining;
+
+#ifdef UNIX
+static size_t (*dlsym_ZSTD_DStreamOutSize)(void);
+static size_t (*dlsym_ZSTD_DStreamInSize)(void);
+static ZSTD_DStream* (*dlsym_ZSTD_createDStream)(void);
+static size_t (*dlsym_ZSTD_initDStream)(ZSTD_DStream*);
+static size_t (*dlsym_ZSTD_freeDStream)(ZSTD_DStream*);
+static size_t (*dlsym_ZSTD_resetDStream)(ZSTD_DStream*);
+static size_t (*dlsym_ZSTD_decompressStream)(ZSTD_DStream*, ZSTD_outBuffer*, ZSTD_inBuffer*);
+static size_t (*dlsym_ZSTD_flushStream)(ZSTD_CStream*, ZSTD_outBuffer*);
+static unsigned (*dlsym_ZSTD_isError)(size_t);
+static const char * (*dlsym_ZSTD_getErrorName)(size_t);
+#endif
+
+#ifdef WINDOWS
+typedef size_t (__cdecl *__dlsym_ZSTD_DStreamOutSize)(void);
+typedef size_t (__cdecl *__dlsym_ZSTD_DStreamInSize)(void);
+typedef ZSTD_DStream* (__cdecl *__dlsym_ZSTD_createDStream)(void);
+typedef size_t (__cdecl *__dlsym_ZSTD_initDStream)(ZSTD_DStream*);
+typedef size_t (__cdecl *__dlsym_ZSTD_freeDStream)(ZSTD_DStream*);
+typedef size_t (__cdecl *__dlsym_ZSTD_resetDStream)(ZSTD_DStream*);
+typedef size_t (__cdecl *__dlsym_ZSTD_decompressStream)(ZSTD_DStream*, ZSTD_outBuffer*, ZSTD_inBuffer*);
+typedef size_t (__cdecl *__dlsym_ZSTD_flushStream)(ZSTD_CStream*, ZSTD_outBuffer*);
+typedef unsigned (__cdecl *__dlsym_ZSTD_isError)(size_t);
+typedef const char * (__cdecl *__dlsym_ZSTD_getErrorName)(size_t);
+
+static __dlsym_ZSTD_DStreamOutSize dlsym_ZSTD_DStreamOutSize;
+static __dlsym_ZSTD_DStreamInSize dlsym_ZSTD_DStreamInSize;
+static __dlsym_ZSTD_createDStream dlsym_ZSTD_createDStream;
+static __dlsym_ZSTD_initDStream dlsym_ZSTD_initDStream;
+static __dlsym_ZSTD_freeDStream dlsym_ZSTD_freeDStream;
+static __dlsym_ZSTD_resetDStream dlsym_ZSTD_resetDStream;
+static __dlsym_ZSTD_decompressStream dlsym_ZSTD_decompressStream;
+static __dlsym_ZSTD_isError dlsym_ZSTD_isError;
+static __dlsym_ZSTD_getErrorName dlsym_ZSTD_getErrorName;
+static __dlsym_ZSTD_flushStream dlsym_ZSTD_flushStream;
+#endif
+
+JNIEXPORT void JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardDecompressor_initIDs (JNIEnv *env, jclass clazz) {
+    // Load libzstd.so
+#ifdef UNIX
+    void *libzstd = dlopen(HADOOP_ZSTD_LIBRARY, RTLD_LAZY | RTLD_GLOBAL);
+    if (!libzstd) {
+        char* msg = (char*)malloc(1000);
+        snprintf(msg, 1000, "%s (%s)!", "Cannot load " HADOOP_ZSTD_LIBRARY, dlerror());
+        THROW(env, "java/lang/UnsatisfiedLinkError", msg);
+        return;
+    }
+#endif
+
+#ifdef WINDOWS
+    HMODULE libzstd = LoadLibrary(HADOOP_ZSTD_LIBRARY);
+    if (!libzstd) {
+        THROW(env, "java/lang/UnsatisfiedLinkError", "Cannot load zstd.dll");
+        return;
+    }
+#endif
+
+#ifdef UNIX
+    dlerror();
+    LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_DStreamOutSize, env, libzstd, "ZSTD_DStreamOutSize");
+    LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_DStreamInSize, env, libzstd, "ZSTD_DStreamInSize");
+    LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_createDStream, env, libzstd, "ZSTD_createDStream");
+    LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_initDStream, env, libzstd, "ZSTD_initDStream");
+    LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_freeDStream, env, libzstd, "ZSTD_freeDStream");
+    LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_resetDStream, env, libzstd, "ZSTD_resetDStream");
+    LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_decompressStream, env, libzstd, "ZSTD_decompressStream");
+    LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_isError, env, libzstd, "ZSTD_isError");
+    LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_getErrorName, env, libzstd, "ZSTD_getErrorName");
+    LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_flushStream, env, libzstd, "ZSTD_flushStream");
+#endif
+
+#ifdef WINDOWS
+    LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_DStreamOutSize, dlsym_ZSTD_DStreamOutSize, env, libzstd, "ZSTD_DStreamOutSize");
+    LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_DStreamInSize, dlsym_ZSTD_DStreamInSize, env, libzstd, "ZSTD_DStreamInSize");
+    LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_createDStream, dlsym_ZSTD_createDStream, env, libzstd, "ZSTD_createDStream");
+    LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_initDStream, dlsym_ZSTD_initDStream, env, libzstd, "ZSTD_initDStream");
+    LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_freeDStream, dlsym_ZSTD_freeDStream, env, libzstd, "ZSTD_freeDStream");
+    LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_resetDStream, dlsym_ZSTD_resetDStream, env, libzstd, "ZSTD_resetDStream");
+    LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_decompressStream, dlsym_ZSTD_decompressStream, env, libzstd, "ZSTD_decompressStream");
+    LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_isError, dlsym_ZSTD_isError, env, libzstd, "ZSTD_isError");
+    LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_getErrorName, dlsym_ZSTD_getErrorName, env, libzstd, "ZSTD_getErrorName");
+    LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_flushStream, dlsym_ZSTD_flushStream, env, libzstd, "ZSTD_flushStream");
+#endif
+
+    ZStandardDecompressor_stream = (*env)->GetFieldID(env, clazz, "stream", "J");
+    ZStandardDecompressor_finished = (*env)->GetFieldID(env, clazz, "finished", "Z");
+    ZStandardDecompressor_compressedDirectBufOff = (*env)->GetFieldID(env, clazz, "compressedDirectBufOff", "I");
+    ZStandardDecompressor_bytesInCompressedBuffer = (*env)->GetFieldID(env, clazz, "bytesInCompressedBuffer", "I");
+    ZStandardDecompressor_directBufferSize = (*env)->GetFieldID(env, clazz, "directBufferSize", "I");
+    ZStandardDecompressor_remaining = (*env)->GetFieldID(env, clazz, "remaining", "I");
+}
+
+JNIEXPORT jlong JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardDecompressor_create(JNIEnv *env, jobject this) {
+    ZSTD_DStream * stream = dlsym_ZSTD_createDStream();
+    if (stream == NULL) {
+        THROW(env, "java/lang/InternalError", "Error creating stream");
+        return (jlong) 0;
+    }
+    return (jlong) stream;
+}
+
+JNIEXPORT void JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardDecompressor_init(JNIEnv *env, jobject this, jlong stream) {
+    size_t result = dlsym_ZSTD_initDStream((ZSTD_DStream *) stream);
+    if (dlsym_ZSTD_isError(result)) {
+        THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(result));
+        return;
+    }
+    (*env)->SetLongField(env, this, ZStandardDecompressor_remaining, 0);
+}
+
+
+JNIEXPORT void JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardDecompressor_free(JNIEnv *env, jclass obj, jlong stream) {
+    size_t result = dlsym_ZSTD_freeDStream((ZSTD_DStream *) stream);
+    if (dlsym_ZSTD_isError(result)) {
+        THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(result));
+        return;
+    }
+}
+
+JNIEXPORT jint JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardDecompressor_inflateBytesDirect
+(JNIEnv *env, jobject this, jobject compressed_direct_buf, jint compressed_direct_buf_off, jint compressed_direct_buf_len, jobject uncompressed_direct_buf, jint uncompressed_direct_buf_off, jint uncompressed_direct_buf_len) {
+    ZSTD_DStream *stream = (ZSTD_DStream *) (*env)->GetLongField(env, this, ZStandardDecompressor_stream);
+    if (!stream) {
+        THROW(env, "java/lang/NullPointerException", NULL);
+        return (jint)0;
+    }
+
+    // Get the input direct buffer
+    void * compressed_bytes = (*env)->GetDirectBufferAddress(env, compressed_direct_buf);
+    if (!compressed_bytes) {
+        THROW(env, "java/lang/InternalError", "Undefined memory address for compressedDirectBuf");
+        return (jint) 0;
+    }
+
+    // Get the output direct buffer
+    void * uncompressed_bytes = (*env)->GetDirectBufferAddress(env, uncompressed_direct_buf);
+    if (!uncompressed_bytes) {
+        THROW(env, "java/lang/InternalError", "Undefined memory address for uncompressedDirectBuf");
+        return (jint) 0;
+    }
+    uncompressed_bytes = ((char*) uncompressed_bytes) + uncompressed_direct_buf_off;
+
+    ZSTD_inBuffer input = { compressed_bytes, compressed_direct_buf_len, compressed_direct_buf_off };
+    ZSTD_outBuffer output = { uncompressed_bytes, uncompressed_direct_buf_len, 0 };
+
+    size_t const size = dlsym_ZSTD_decompressStream(stream, &output, &input);
+
+    // check for errors
+    if (dlsym_ZSTD_isError(size)) {
+        THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(size));
+        return (jint) 0;
+    }
+    int remaining = input.size - input.pos;
+    (*env)->SetIntField(env, this, ZStandardDecompressor_remaining, remaining);
+
+    // the entire frame has been decoded
+    if (size == 0) {
+        (*env)->SetBooleanField(env, this, ZStandardDecompressor_finished, JNI_TRUE);
+        size_t result = dlsym_ZSTD_initDStream(stream);
+        if (dlsym_ZSTD_isError(result)) {
+            THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(result));
+            return (jint) 0;
+        }
+    }
+    (*env)->SetIntField(env, this, ZStandardDecompressor_compressedDirectBufOff, input.pos);
+    (*env)->SetIntField(env, this, ZStandardDecompressor_bytesInCompressedBuffer, input.size);
+    return (jint) output.pos;
+}
+
+// returns the max size of the recommended input and output buffers
+JNIEXPORT jint JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardDecompressor_getStreamSize
+(JNIEnv *env, jclass obj) {
+    int x = (int) dlsym_ZSTD_DStreamInSize();
+    int y = (int) dlsym_ZSTD_DStreamOutSize();
+    return (x >= y) ? x : y;
+}
+
+#endif //define HADOOP_ZSTD_LIBRARY
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/org_apache_hadoop_io_compress_zstd.h
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/org_apache_hadoop_io_compress_zstd.h b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/org_apache_hadoop_io_compress_zstd.h
new file mode 100644
index 0000000..78fc0a4
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/org_apache_hadoop_io_compress_zstd.h
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#ifndef ORG_APACHE_HADOOP_IO_COMPRESS_ZSTD_ZSTD_H
+#define ORG_APACHE_HADOOP_IO_COMPRESS_ZSTD_ZSTD_H
+
+#include "org_apache_hadoop.h"
+
+#ifdef UNIX
+#include <dlfcn.h>
+#endif
+
+#include <jni.h>
+#include <zstd.h>
+#include <stddef.h>
+
+
+#endif //ORG_APACHE_HADOOP_IO_COMPRESS_ZSTD_ZSTD_H

http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/util/NativeCodeLoader.c
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/util/NativeCodeLoader.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/util/NativeCodeLoader.c
index 3625112..704f40c 100644
--- a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/util/NativeCodeLoader.c
+++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/util/NativeCodeLoader.c
@@ -39,6 +39,16 @@ JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_util_NativeCodeLoader_buildSup
 #endif
 }
 
+JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_util_NativeCodeLoader_buildSupportsZstd
+  (JNIEnv *env, jclass clazz)
+{
+#ifdef HADOOP_ZSTD_LIBRARY
+  return JNI_TRUE;
+#else
+  return JNI_FALSE;
+#endif
+}
+
 JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_util_NativeCodeLoader_buildSupportsOpenssl
   (JNIEnv *env, jclass clazz)
 {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.io.compress.CompressionCodec
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.io.compress.CompressionCodec b/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.io.compress.CompressionCodec
index df46e32..568972e 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.io.compress.CompressionCodec
+++ b/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.io.compress.CompressionCodec
@@ -17,4 +17,4 @@ org.apache.hadoop.io.compress.DeflateCodec
 org.apache.hadoop.io.compress.GzipCodec
 org.apache.hadoop.io.compress.Lz4Codec
 org.apache.hadoop.io.compress.SnappyCodec
-
+org.apache.hadoop.io.compress.ZStandardCodec

http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/hadoop-common-project/hadoop-common/src/site/markdown/NativeLibraries.md.vm
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/NativeLibraries.md.vm b/hadoop-common-project/hadoop-common/src/site/markdown/NativeLibraries.md.vm
index 04ff426..e4f720c 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/NativeLibraries.md.vm
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/NativeLibraries.md.vm
@@ -118,6 +118,7 @@ NativeLibraryChecker is a tool to check whether native libraries are loaded corr
        hadoop: true /home/ozawa/hadoop/lib/native/libhadoop.so.1.0.0
        zlib:   true /lib/x86_64-linux-gnu/libz.so.1
        snappy: true /usr/lib/libsnappy.so.1
+       zstd: true /usr/lib/libzstd.so.1
        lz4:    true revision:99
        bzip2:  false
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java
index 1029517..4bb79a7 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java
@@ -75,6 +75,7 @@ import org.junit.Assert;
 import org.junit.Assume;
 import org.junit.Test;
 import static org.junit.Assert.*;
+import static org.junit.Assume.assumeTrue;
 
 public class TestCodec {
 
@@ -519,6 +520,18 @@ public class TestCodec {
     }
   }
 
+  @Test(timeout=20000)
+  public void testSequenceFileZStandardCodec() throws Exception {
+    assumeTrue(ZStandardCodec.isNativeCodeLoaded());
+    Configuration conf = new Configuration();
+    sequenceFileCodecTest(conf, 0,
+        "org.apache.hadoop.io.compress.ZStandardCodec", 100);
+    sequenceFileCodecTest(conf, 100,
+        "org.apache.hadoop.io.compress.ZStandardCodec", 100);
+    sequenceFileCodecTest(conf, 200000,
+        "org.apache.hadoop.io.compress.ZStandardCodec", 1000000);
+  }
+
   @Test
   public void testSequenceFileDeflateCodec() throws IOException, ClassNotFoundException,
       InstantiationException, IllegalAccessException {
@@ -581,7 +594,7 @@ public class TestCodec {
    */
   @Test
   public void testSnappyMapFile() throws Exception {
-    Assume.assumeTrue(SnappyCodec.isNativeCodeLoaded());
+    assumeTrue(SnappyCodec.isNativeCodeLoaded());
     codecTestMapFile(SnappyCodec.class, CompressionType.BLOCK, 100);
   }
   

http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCompressionStreamReuse.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCompressionStreamReuse.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCompressionStreamReuse.java
index 2d75a2d..7b55cac 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCompressionStreamReuse.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCompressionStreamReuse.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.util.ReflectionUtils;
 
 import org.junit.Test;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
 
 public class TestCompressionStreamReuse {
   private static final Log LOG = LogFactory
@@ -69,6 +70,13 @@ public class TestCompressionStreamReuse {
         "org.apache.hadoop.io.compress.GzipCodec");
   }
 
+  @Test
+  public void testZStandardCompressStreamReuse() throws IOException {
+    assumeTrue(ZStandardCodec.isNativeCodeLoaded());
+    resetStateTest(conf, seed, count,
+        "org.apache.hadoop.io.compress.ZStandardCodec");
+  }
+
   private void resetStateTest(Configuration conf, int seed, int count,
       String codecClass) throws IOException {
     // Create the codec


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message