parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jul...@apache.org
Subject parquet-cpp git commit: PARQUET-456: Finish gzip implementation and unit test all compressors
Date Fri, 12 Feb 2016 23:38:52 GMT
Repository: parquet-cpp
Updated Branches:
  refs/heads/master 5f3499c60 -> 05cd4ec28


PARQUET-456: Finish gzip implementation and unit test all compressors

We should perhaps separate compression and decompression code (as in Impala) as gzip is more
stateful than the other compressors.

Closes #11 when merged.

Author: Wes McKinney <wes@cloudera.com>
Author: Konstantin Knizhnik <knizhnik@garret.ru>

Closes #48 from wesm/PARQUET-456 and squashes the following commits:

5aeba2a [Wes McKinney] Comment typo
8e1f8f2 [Wes McKinney] Move test run to shell script and enable OS X
633fd71 [Wes McKinney] Port gzip codec code from Impala, expand tests, get them to pass
a8d3c11 [Wes McKinney] Add compression round-trip test, gzip needs a bunch more work though
0bc8cf7 [Wes McKinney] Fix PATH_SUFFIXES for zlib
69548c9 [Konstantin Knizhnik] Add zlib to thirdparty build toolchain for compression codec


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/05cd4ec2
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/05cd4ec2
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/05cd4ec2

Branch: refs/heads/master
Commit: 05cd4ec2806316f6397857bd9e58263a0de38e32
Parents: 5f3499c
Author: Wes McKinney <wes@cloudera.com>
Authored: Fri Feb 12 15:37:38 2016 -0800
Committer: Julien Le Dem <julien@dremio.com>
Committed: Fri Feb 12 15:37:38 2016 -0800

----------------------------------------------------------------------
 .travis.yml                             |   5 +-
 CMakeLists.txt                          |   6 +
 ci/run_tests.sh                         |   5 +
 cmake_modules/FindZLIB.cmake            |  92 ++++++++++++++
 setup_build_env.sh                      |   1 +
 src/parquet/compression/CMakeLists.txt  |   6 +-
 src/parquet/compression/codec-test.cc   |  87 ++++++++++++++
 src/parquet/compression/codec.h         |  76 +++++++++---
 src/parquet/compression/gzip-codec.cc   | 171 +++++++++++++++++++++++++++
 src/parquet/compression/lz4-codec.cc    |  12 +-
 src/parquet/compression/snappy-codec.cc |  10 +-
 src/parquet/util/test-common.h          |   8 ++
 thirdparty/build_thirdparty.sh          |   8 ++
 thirdparty/download_thirdparty.sh       |   5 +
 thirdparty/versions.sh                  |   4 +
 15 files changed, 465 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/05cd4ec2/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index a860bd7..09f4705 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -40,8 +40,5 @@ before_script:
 
 script:
 - make
-- >
-  if [ $TRAVIS_OS_NAME == linux ]; then
-    valgrind --tool=memcheck --leak-check=yes --error-exitcode=1 ctest;
-  fi
+- source $TRAVIS_BUILD_DIR/ci/run_tests.sh
 - make lint

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/05cd4ec2/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index d262375..ec7d66b 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -174,6 +174,12 @@ include_directories(SYSTEM ${LZ4_INCLUDE_DIR})
 add_library(lz4static STATIC IMPORTED)
 set_target_properties(lz4static PROPERTIES IMPORTED_LOCATION ${LZ4_STATIC_LIB})
 
+## ZLIB
+find_package(ZLIB REQUIRED)
+include_directories(SYSTEM ${ZLIB_INCLUDE_DIRS})
+add_library(zlibstatic STATIC IMPORTED)
+set_target_properties(zlibstatic PROPERTIES IMPORTED_LOCATION ${ZLIB_LIBRARIES})
+
 ## GTest
 find_package(GTest REQUIRED)
 include_directories(SYSTEM ${GTEST_INCLUDE_DIR})

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/05cd4ec2/ci/run_tests.sh
----------------------------------------------------------------------
diff --git a/ci/run_tests.sh b/ci/run_tests.sh
new file mode 100755
index 0000000..aa7b2f6
--- /dev/null
+++ b/ci/run_tests.sh
@@ -0,0 +1,5 @@
+if [ $TRAVIS_OS_NAME == "linux" ]; then
+  valgrind --tool=memcheck --leak-check=yes --error-exitcode=1 ctest;
+else
+  ctest;
+fi

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/05cd4ec2/cmake_modules/FindZLIB.cmake
----------------------------------------------------------------------
diff --git a/cmake_modules/FindZLIB.cmake b/cmake_modules/FindZLIB.cmake
new file mode 100644
index 0000000..0d7f2ae
--- /dev/null
+++ b/cmake_modules/FindZLIB.cmake
@@ -0,0 +1,92 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Tries to find ZLIB headers and libraries.
+#
+# Usage of this module as follows:
+#
+#  find_package(ZLIB)
+#
+# Variables used by this module, they can change the default behaviour and need
+# to be set before calling find_package:
+#
+#  ZLIB_HOME - When set, this path is inspected instead of standard library
+#             locations as the root of the ZLIB installation.
+#             The environment variable ZLIB_HOME overrides this veriable.
+#
+# - Find ZLIB (zlib.h, libz.a, libz.so, and libz.so.1)
+# This module defines
+#  ZLIB_INCLUDE_DIR, directory containing headers
+#  ZLIB_LIBS, directory containing zlib libraries
+#  ZLIB_STATIC_LIB, path to libz.a
+#  ZLIB_SHARED_LIB, path to libz's shared library
+#  ZLIB_FOUND, whether zlib has been found
+
+if( NOT "$ENV{ZLIB_HOME}" STREQUAL "")
+    file( TO_CMAKE_PATH "$ENV{ZLIB_HOME}" _native_path )
+    list( APPEND _zlib_roots ${_native_path} )
+elseif ( ZLIB_HOME )
+    list( APPEND _zlib_roots ${ZLIB_HOME} )
+endif()
+
+# Try the parameterized roots, if they exist
+if ( _zlib_roots )
+    find_path( ZLIB_INCLUDE_DIR NAMES zlib.h
+        PATHS ${_zlib_roots} NO_DEFAULT_PATH
+        PATH_SUFFIXES "include" )
+    find_library( ZLIB_LIBRARIES NAMES z
+        PATHS ${_zlib_roots} NO_DEFAULT_PATH
+        PATH_SUFFIXES "lib" )
+else ()
+    find_path( ZLIB_INCLUDE_DIR NAMES zlib.h )
+    find_library( ZLIB_LIBRARIES NAMES z )
+endif ()
+
+
+if (ZLIB_INCLUDE_DIR AND ZLIB_LIBRARIES)
+  set(ZLIB_FOUND TRUE)
+  get_filename_component( ZLIB_LIBS ${ZLIB_LIBRARIES} DIRECTORY )
+  set(ZLIB_LIB_NAME libz)
+  set(ZLIB_STATIC_LIB ${ZLIB_LIBS}/${ZLIB_LIB_NAME}.a)
+  set(ZLIB_SHARED_LIB ${ZLIB_LIBS}/${ZLIB_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX})
+else ()
+  set(ZLIB_FOUND FALSE)
+endif ()
+
+if (ZLIB_FOUND)
+  if (NOT ZLIB_FIND_QUIETLY)
+    message(STATUS "Found the ZLIB library: ${ZLIB_LIBRARIES}")
+  endif ()
+else ()
+  if (NOT ZLIB_FIND_QUIETLY)
+    set(ZLIB_ERR_MSG "Could not find the ZLIB library. Looked in ")
+    if ( _zlib_roots )
+      set(ZLIB_ERR_MSG "${ZLIB_ERR_MSG} in ${_zlib_roots}.")
+    else ()
+      set(ZLIB_ERR_MSG "${ZLIB_ERR_MSG} system search paths.")
+    endif ()
+    if (ZLIB_FIND_REQUIRED)
+      message(FATAL_ERROR "${ZLIB_ERR_MSG}")
+    else (ZLIB_FIND_REQUIRED)
+      message(STATUS "${ZLIB_ERR_MSG}")
+    endif (ZLIB_FIND_REQUIRED)
+  endif ()
+endif ()
+
+mark_as_advanced(
+  ZLIB_INCLUDE_DIR
+  ZLIB_LIBS
+  ZLIB_LIBRARIES
+  ZLIB_STATIC_LIB
+  ZLIB_SHARED_LIB
+)

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/05cd4ec2/setup_build_env.sh
----------------------------------------------------------------------
diff --git a/setup_build_env.sh b/setup_build_env.sh
index c95b889..6df1f49 100755
--- a/setup_build_env.sh
+++ b/setup_build_env.sh
@@ -12,6 +12,7 @@ source thirdparty/versions.sh
 
 export SNAPPY_HOME=$BUILD_DIR/thirdparty/installed
 export LZ4_HOME=$BUILD_DIR/thirdparty/installed
+export ZLIB_HOME=$BUILD_DIR/thirdparty/installed
 # build script doesn't support building thrift on OSX
 if [ "$(uname)" != "Darwin" ]; then
   export THRIFT_HOME=$BUILD_DIR/thirdparty/installed

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/05cd4ec2/src/parquet/compression/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/parquet/compression/CMakeLists.txt b/src/parquet/compression/CMakeLists.txt
index 04f6535..2c0b67c 100644
--- a/src/parquet/compression/CMakeLists.txt
+++ b/src/parquet/compression/CMakeLists.txt
@@ -18,10 +18,12 @@
 add_library(parquet_compression STATIC
   lz4-codec.cc
   snappy-codec.cc
+  gzip-codec.cc
 )
 target_link_libraries(parquet_compression
   lz4static
-  snappystatic)
+  snappystatic
+  zlibstatic)
 
 set_target_properties(parquet_compression
   PROPERTIES
@@ -31,3 +33,5 @@ set_target_properties(parquet_compression
 install(FILES
   codec.h
   DESTINATION include/parquet/compression)
+
+ADD_PARQUET_TEST(codec-test)

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/05cd4ec2/src/parquet/compression/codec-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/compression/codec-test.cc b/src/parquet/compression/codec-test.cc
new file mode 100644
index 0000000..610fb37
--- /dev/null
+++ b/src/parquet/compression/codec-test.cc
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <string>
+#include <vector>
+
+#include <gtest/gtest.h>
+#include "parquet/util/test-common.h"
+
+#include "parquet/compression/codec.h"
+
+using std::string;
+using std::vector;
+
+namespace parquet_cpp {
+
+template <typename T>
+void CheckCodecRoundtrip(const vector<uint8_t>& data) {
+  // create multiple compressors to try to break them
+  T c1;
+  T c2;
+
+  int max_compressed_len = c1.MaxCompressedLen(data.size(), &data[0]);
+  std::vector<uint8_t> compressed(max_compressed_len);
+  std::vector<uint8_t> decompressed(data.size());
+
+  // compress with c1
+  int actual_size = c1.Compress(data.size(), &data[0], max_compressed_len,
+      &compressed[0]);
+  compressed.resize(actual_size);
+
+  // decompress with c2
+  c2.Decompress(compressed.size(), &compressed[0],
+      decompressed.size(), &decompressed[0]);
+
+  ASSERT_TRUE(test::vector_equal(data, decompressed));
+
+  // compress with c2
+  int actual_size2 = c2.Compress(data.size(), &data[0], max_compressed_len,
+      &compressed[0]);
+  ASSERT_EQ(actual_size2, actual_size);
+
+  // decompress with c1
+  c1.Decompress(compressed.size(), &compressed[0],
+      decompressed.size(), &decompressed[0]);
+
+  ASSERT_TRUE(test::vector_equal(data, decompressed));
+}
+
+template <typename T>
+void CheckCodec() {
+  int sizes[] = {10000, 100000};
+  for (int data_size : sizes) {
+    vector<uint8_t> data;
+    test::random_bytes(data_size, 1234, &data);
+    CheckCodecRoundtrip<T>(data);
+  }
+}
+
+TEST(TestCompressors, Snappy) {
+  CheckCodec<SnappyCodec>();
+}
+
+TEST(TestCompressors, Lz4) {
+  CheckCodec<Lz4Codec>();
+}
+
+TEST(TestCompressors, GZip) {
+  CheckCodec<GZipCodec>();
+}
+
+} // namespace parquet_cpp

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/05cd4ec2/src/parquet/compression/codec.h
----------------------------------------------------------------------
diff --git a/src/parquet/compression/codec.h b/src/parquet/compression/codec.h
index 743a17d..8fc4ada 100644
--- a/src/parquet/compression/codec.h
+++ b/src/parquet/compression/codec.h
@@ -20,6 +20,8 @@
 
 #include <cstdint>
 
+#include <zlib.h>
+
 #include "parquet/exception.h"
 
 namespace parquet_cpp {
@@ -27,13 +29,13 @@ namespace parquet_cpp {
 class Codec {
  public:
   virtual ~Codec() {}
-  virtual void Decompress(int input_len, const uint8_t* input,
-      int output_len, uint8_t* output_buffer) = 0;
+  virtual void Decompress(int64_t input_len, const uint8_t* input,
+      int64_t output_len, uint8_t* output_buffer) = 0;
 
-  virtual int Compress(int input_len, const uint8_t* input,
-      int output_buffer_len, uint8_t* output_buffer) = 0;
+  virtual int64_t Compress(int64_t input_len, const uint8_t* input,
+      int64_t output_buffer_len, uint8_t* output_buffer) = 0;
 
-  virtual int MaxCompressedLen(int input_len, const uint8_t* input) = 0;
+  virtual int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input) = 0;
 
   virtual const char* name() const = 0;
 };
@@ -42,13 +44,13 @@ class Codec {
 // Snappy codec.
 class SnappyCodec : public Codec {
  public:
-  virtual void Decompress(int input_len, const uint8_t* input,
-      int output_len, uint8_t* output_buffer);
+  virtual void Decompress(int64_t input_len, const uint8_t* input,
+      int64_t output_len, uint8_t* output_buffer);
 
-  virtual int Compress(int input_len, const uint8_t* input,
-      int output_buffer_len, uint8_t* output_buffer);
+  virtual int64_t Compress(int64_t input_len, const uint8_t* input,
+      int64_t output_buffer_len, uint8_t* output_buffer);
 
-  virtual int MaxCompressedLen(int input_len, const uint8_t* input);
+  virtual int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input);
 
   virtual const char* name() const { return "snappy"; }
 };
@@ -56,17 +58,61 @@ class SnappyCodec : public Codec {
 // Lz4 codec.
 class Lz4Codec : public Codec {
  public:
-  virtual void Decompress(int input_len, const uint8_t* input,
-      int output_len, uint8_t* output_buffer);
+  virtual void Decompress(int64_t input_len, const uint8_t* input,
+      int64_t output_len, uint8_t* output_buffer);
 
-  virtual int Compress(int input_len, const uint8_t* input,
-      int output_buffer_len, uint8_t* output_buffer);
+  virtual int64_t Compress(int64_t input_len, const uint8_t* input,
+      int64_t output_buffer_len, uint8_t* output_buffer);
 
-  virtual int MaxCompressedLen(int input_len, const uint8_t* input);
+  virtual int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input);
 
   virtual const char* name() const { return "lz4"; }
 };
 
+// GZip codec.
+class GZipCodec : public Codec {
+ public:
+  /// Compression formats supported by the zlib library
+  enum Format {
+    ZLIB,
+    DEFLATE,
+    GZIP,
+  };
+
+  explicit GZipCodec(Format format = GZIP);
+
+  virtual void Decompress(int64_t input_len, const uint8_t* input,
+      int64_t output_len, uint8_t* output_buffer);
+
+  virtual int64_t Compress(int64_t input_len, const uint8_t* input,
+      int64_t output_buffer_len, uint8_t* output_buffer);
+
+  virtual int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input);
+
+  virtual const char* name() const { return "gzip"; }
+
+ private:
+  // zlib is stateful and the z_stream state variable must be initialized
+  // before
+  z_stream stream_;
+
+  // Realistically, this will always be GZIP, but we leave the option open to
+  // configure
+  Format format_;
+
+  // These variables are mutually exclusive. When the codec is in "compressor"
+  // state, compressor_initialized_ is true while decompressor_initialized_ is
+  // false. When it's decompressing, the opposite is true.
+  //
+  // Indeed, this is slightly hacky, but the alternative is having separate
+  // Compressor and Decompressor classes. If this ever becomes an issue, we can
+  // perform the refactoring then
+  void InitCompressor();
+  void InitDecompressor();
+  bool compressor_initialized_;
+  bool decompressor_initialized_;
+};
+
 } // namespace parquet_cpp
 
 #endif

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/05cd4ec2/src/parquet/compression/gzip-codec.cc
----------------------------------------------------------------------
diff --git a/src/parquet/compression/gzip-codec.cc b/src/parquet/compression/gzip-codec.cc
new file mode 100644
index 0000000..6ec2726
--- /dev/null
+++ b/src/parquet/compression/gzip-codec.cc
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/compression/codec.h"
+
+#include <cstring>
+#include <sstream>
+
+namespace parquet_cpp {
+
+// These are magic numbers from zlib.h.  Not clear why they are not defined
+// there.
+
+// Maximum window size
+static constexpr int WINDOW_BITS = 15;
+
+// Output Gzip.
+static constexpr int GZIP_CODEC = 16;
+
+// Determine if this is libz or gzip from header.
+static constexpr int DETECT_CODEC = 32;
+
+GZipCodec::GZipCodec(Format format) :
+    format_(format),
+    compressor_initialized_(false),
+    decompressor_initialized_(false) {
+}
+
+void GZipCodec::InitCompressor() {
+  memset(&stream_, 0, sizeof(stream_));
+
+  int ret;
+  // Initialize to run specified format
+  int window_bits = WINDOW_BITS;
+  if (format_ == DEFLATE) {
+    window_bits = -window_bits;
+  } else if (format_ == GZIP) {
+    window_bits += GZIP_CODEC;
+  }
+  if ((ret = deflateInit2(&stream_, Z_DEFAULT_COMPRESSION, Z_DEFLATED,
+              window_bits, 9, Z_DEFAULT_STRATEGY)) != Z_OK) {
+    throw ParquetException("zlib deflateInit failed: " +
+        std::string(stream_.msg));
+  }
+
+  compressor_initialized_ = true;
+  decompressor_initialized_ = false;
+}
+
+void GZipCodec::InitDecompressor() {
+  memset(&stream_, 0, sizeof(stream_));
+
+  int ret;
+
+  // Initialize to run either deflate or zlib/gzip format
+  int window_bits = format_ == DEFLATE ? -WINDOW_BITS : WINDOW_BITS | DETECT_CODEC;
+  if ((ret = inflateInit2(&stream_, window_bits)) != Z_OK) {
+    throw ParquetException("zlib inflateInit failed: " +  std::string(stream_.msg));
+  }
+
+  compressor_initialized_ = false;
+  decompressor_initialized_ = true;
+}
+
+void GZipCodec::Decompress(int64_t input_length, const uint8_t* input,
+    int64_t output_length, uint8_t* output) {
+  if (!decompressor_initialized_) {
+    InitDecompressor();
+  }
+  if (output_length == 0) {
+    // The zlib library does not allow *output to be NULL, even when output_length
+    // is 0 (inflate() will return Z_STREAM_ERROR). We don't consider this an
+    // error, so bail early if no output is expected. Note that we don't signal
+    // an error if the input actually contains compressed data.
+    return;
+  }
+
+  // Reset the stream for this block
+  if (inflateReset(&stream_) != Z_OK) {
+    throw ParquetException("zlib inflateReset failed: " + std::string(stream_.msg));
+  }
+
+  int ret = 0;
+  // gzip can run in streaming mode or non-streaming mode.  We only
+  // support the non-streaming use case where we present it the entire
+  // compressed input and a buffer big enough to contain the entire
+  // compressed output.  In the case where we don't know the output,
+  // we just make a bigger buffer and try the non-streaming mode
+  // from the beginning again.
+  while (ret != Z_STREAM_END) {
+    stream_.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input));
+    stream_.avail_in = input_length;
+    stream_.next_out = reinterpret_cast<Bytef*>(output);
+    stream_.avail_out = output_length;
+
+    // We know the output size.  In this case, we can use Z_FINISH
+    // which is more efficient.
+    ret = inflate(&stream_, Z_FINISH);
+    if (ret == Z_STREAM_END || ret != Z_OK) break;
+
+    // Failure, buffer was too small
+    std::stringstream ss;
+    ss << "Too small a buffer passed to GZipCodec. InputLength="
+       << input_length << " OutputLength=" << output_length;
+    throw ParquetException(ss.str());
+  }
+
+  // Failure for some other reason
+  if (ret != Z_STREAM_END) {
+    std::stringstream ss;
+    ss << "GZipCodec failed: ";
+    if (stream_.msg != NULL) ss << stream_.msg;
+    throw ParquetException(ss.str());
+  }
+}
+
+int64_t GZipCodec::MaxCompressedLen(int64_t input_length, const uint8_t* input) {
+  // Most be in compression mode
+  if (!compressor_initialized_) {
+    InitCompressor();
+  }
+  // TODO(wesm): deal with zlib < 1.2.3 (see Impala codebase)
+  return deflateBound(&stream_, static_cast<uLong>(input_length));
+}
+
+int64_t GZipCodec::Compress(int64_t input_length, const uint8_t* input,
+    int64_t output_length, uint8_t* output) {
+  if (!compressor_initialized_) {
+    InitCompressor();
+  }
+  stream_.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input));
+  stream_.avail_in = input_length;
+  stream_.next_out = reinterpret_cast<Bytef*>(output);
+  stream_.avail_out = output_length;
+
+  int64_t ret = 0;
+  if ((ret = deflate(&stream_, Z_FINISH)) != Z_STREAM_END) {
+    if (ret == Z_OK) {
+      // will return Z_OK (and stream.msg NOT set) if stream.avail_out is too
+      // small
+      throw ParquetException("zlib deflate failed, output buffer to small");
+    }
+    std::stringstream ss;
+    ss << "zlib deflate failed: " << stream_.msg;
+    throw ParquetException(ss.str());
+  }
+
+  if (deflateReset(&stream_) != Z_OK) {
+    throw ParquetException("zlib deflateReset failed: " +
+        std::string(stream_.msg));
+  }
+
+  // Actual output length
+  return output_length - stream_.avail_out;
+}
+
+} // namespace parquet_cpp

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/05cd4ec2/src/parquet/compression/lz4-codec.cc
----------------------------------------------------------------------
diff --git a/src/parquet/compression/lz4-codec.cc b/src/parquet/compression/lz4-codec.cc
index 7b485f6..dfd50f6 100644
--- a/src/parquet/compression/lz4-codec.cc
+++ b/src/parquet/compression/lz4-codec.cc
@@ -21,21 +21,21 @@
 
 namespace parquet_cpp {
 
-void Lz4Codec::Decompress(int input_len, const uint8_t* input,
-      int output_len, uint8_t* output_buffer) {
-  int n = LZ4_decompress_fast(reinterpret_cast<const char*>(input),
+void Lz4Codec::Decompress(int64_t input_len, const uint8_t* input,
+      int64_t output_len, uint8_t* output_buffer) {
+  int64_t n = LZ4_decompress_fast(reinterpret_cast<const char*>(input),
       reinterpret_cast<char*>(output_buffer), output_len);
   if (n != input_len) {
     throw parquet_cpp::ParquetException("Corrupt lz4 compressed data.");
   }
 }
 
-int Lz4Codec::MaxCompressedLen(int input_len, const uint8_t* input) {
+int64_t Lz4Codec::MaxCompressedLen(int64_t input_len, const uint8_t* input) {
   return LZ4_compressBound(input_len);
 }
 
-int Lz4Codec::Compress(int input_len, const uint8_t* input,
-    int output_buffer_len, uint8_t* output_buffer) {
+int64_t Lz4Codec::Compress(int64_t input_len, const uint8_t* input,
+    int64_t output_buffer_len, uint8_t* output_buffer) {
   return LZ4_compress(reinterpret_cast<const char*>(input),
       reinterpret_cast<char*>(output_buffer), input_len);
 }

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/05cd4ec2/src/parquet/compression/snappy-codec.cc
----------------------------------------------------------------------
diff --git a/src/parquet/compression/snappy-codec.cc b/src/parquet/compression/snappy-codec.cc
index 0c7a63e..4135a15 100644
--- a/src/parquet/compression/snappy-codec.cc
+++ b/src/parquet/compression/snappy-codec.cc
@@ -21,20 +21,20 @@
 
 namespace parquet_cpp {
 
-void SnappyCodec::Decompress(int input_len, const uint8_t* input,
-      int output_len, uint8_t* output_buffer) {
+void SnappyCodec::Decompress(int64_t input_len, const uint8_t* input,
+      int64_t output_len, uint8_t* output_buffer) {
   if (!snappy::RawUncompress(reinterpret_cast<const char*>(input),
       static_cast<size_t>(input_len), reinterpret_cast<char*>(output_buffer)))
{
     throw parquet_cpp::ParquetException("Corrupt snappy compressed data.");
   }
 }
 
-int SnappyCodec::MaxCompressedLen(int input_len, const uint8_t* input) {
+int64_t SnappyCodec::MaxCompressedLen(int64_t input_len, const uint8_t* input) {
   return snappy::MaxCompressedLength(input_len);
 }
 
-int SnappyCodec::Compress(int input_len, const uint8_t* input,
-    int output_buffer_len, uint8_t* output_buffer) {
+int64_t SnappyCodec::Compress(int64_t input_len, const uint8_t* input,
+    int64_t output_buffer_len, uint8_t* output_buffer) {
   size_t output_len;
   snappy::RawCompress(reinterpret_cast<const char*>(input),
       static_cast<size_t>(input_len), reinterpret_cast<char*>(output_buffer),

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/05cd4ec2/src/parquet/util/test-common.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/test-common.h b/src/parquet/util/test-common.h
index 84519d6..e75b163 100644
--- a/src/parquet/util/test-common.h
+++ b/src/parquet/util/test-common.h
@@ -95,6 +95,14 @@ static inline vector<bool> flip_coins(size_t n, double p) {
   return draws;
 }
 
+void random_bytes(int n, uint32_t seed, std::vector<uint8_t>* out) {
+  std::mt19937 gen(seed);
+  std::uniform_int_distribution<int> d(0, 255);
+
+  for (int i = 0; i < n; ++i) {
+    out->push_back(d(gen) & 0xFF);
+  }
+}
 
 } // namespace test
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/05cd4ec2/thirdparty/build_thirdparty.sh
----------------------------------------------------------------------
diff --git a/thirdparty/build_thirdparty.sh b/thirdparty/build_thirdparty.sh
index 5e5cf6a..5e5fca4 100755
--- a/thirdparty/build_thirdparty.sh
+++ b/thirdparty/build_thirdparty.sh
@@ -16,6 +16,7 @@ else
   for arg in "$*"; do
     case $arg in
       "lz4")        F_LZ4=1 ;;
+      "zlib")       F_ZLIB=1 ;;
       "gtest")      F_GTEST=1 ;;
       "snappy")     F_SNAPPY=1 ;;
       "thrift")     F_THRIFT=1 ;;
@@ -74,6 +75,13 @@ if [ -n "$F_ALL" -o -n "$F_LZ4" ]; then
   make -j$PARALLEL install
 fi
 
+# build zlib
+if [ -n "$F_ALL" -o -n "$F_ZLIB" ]; then
+  cd $TP_DIR/$ZLIB_BASEDIR
+  CFLAGS=-fPIC cmake -DCMAKE_INSTALL_PREFIX:PATH=$PREFIX $ZLIB_DIR
+  make -j$PARALLEL install
+fi
+
 # build thrift
 if [ -n "$F_ALL" -o -n "$F_THRIFT" ]; then
   if [ "$(uname)" == "Darwin" ]; then

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/05cd4ec2/thirdparty/download_thirdparty.sh
----------------------------------------------------------------------
diff --git a/thirdparty/download_thirdparty.sh b/thirdparty/download_thirdparty.sh
index 9111cd4..e0dd7fd 100755
--- a/thirdparty/download_thirdparty.sh
+++ b/thirdparty/download_thirdparty.sh
@@ -33,3 +33,8 @@ if [ ! -d ${THRIFT_BASEDIR} ]; then
   echo "Fetching thrift"
   download_extract_and_cleanup $THRIFT_URL
 fi
+
+if [ ! -d ${ZLIB_BASEDIR} ]; then
+  echo "Fetching zlib"
+  download_extract_and_cleanup $ZLIB_URL
+fi

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/05cd4ec2/thirdparty/versions.sh
----------------------------------------------------------------------
diff --git a/thirdparty/versions.sh b/thirdparty/versions.sh
index 9fa2d31..8c22265 100755
--- a/thirdparty/versions.sh
+++ b/thirdparty/versions.sh
@@ -13,3 +13,7 @@ THRIFT_BASEDIR=thrift-$THRIFT_VERSION
 GTEST_VERSION=1.7.0
 GTEST_URL="https://github.com/google/googletest/archive/release-${GTEST_VERSION}.tar.gz"
 GTEST_BASEDIR=googletest-release-$GTEST_VERSION
+
+ZLIB_VERSION=1.2.8
+ZLIB_URL=http://zlib.net/zlib-${ZLIB_VERSION}.tar.gz
+ZLIB_BASEDIR=zlib-${ZLIB_VERSION}


Mime
View raw message