parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jul...@apache.org
Subject parquet-cpp git commit: PARQUET-457: Verify page deserialization for GZIP and SNAPPY codecs, related refactoring
Date Sun, 21 Feb 2016 00:11:59 GMT
Repository: parquet-cpp
Updated Branches:
  refs/heads/master 9cab887f2 -> 891985439


PARQUET-457: Verify page deserialization for GZIP and SNAPPY codecs, related refactoring

This also restores passing on user's `CMAKE_CXX_FLAGS`, which had unfortunately led some compiler
warnings to creep into our build.

Author: Wes McKinney <wesm@apache.org>

Closes #58 from wesm/PARQUET-457 and squashes the following commits:

4bf12ed [Wes McKinney] * SerializeThriftMsg now writes into an OutputStream. * Refactor page
serialization in advance of compression tests * Test compression roundtrip on random bytes
for snappy and gzip * Trying LZO compression results in ParquetException * Don't lose user's
CMAKE_CXX_FLAGS * Remove Travis CI directory caching for now * Fix gzip memory leak if you
do not call inflateEnd, deflateEnd


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/89198543
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/89198543
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/89198543

Branch: refs/heads/master
Commit: 89198543987ea8830501f35ba11581bf3a1b5a03
Parents: 9cab887
Author: Wes McKinney <wesm@apache.org>
Authored: Sat Feb 20 16:11:41 2016 -0800
Committer: Julien Le Dem <julien@dremio.com>
Committed: Sat Feb 20 16:11:41 2016 -0800

----------------------------------------------------------------------
 .travis.yml                                  |   3 -
 CMakeLists.txt                               |   6 +-
 src/parquet/column/levels-test.cc            |   1 +
 src/parquet/column/page.h                    |  16 +-
 src/parquet/column/test-util.h               |  29 ---
 src/parquet/compression/CMakeLists.txt       |   1 +
 src/parquet/compression/codec.cc             |  47 +++++
 src/parquet/compression/codec.h              |   8 +
 src/parquet/compression/gzip-codec.cc        |  31 ++-
 src/parquet/encodings/plain-encoding-test.cc |   8 +-
 src/parquet/file/file-deserialize-test.cc    | 232 +++++++++++++++++-----
 src/parquet/file/reader-internal.cc          |  30 +--
 src/parquet/schema/schema-descriptor-test.cc |   1 +
 src/parquet/thrift/CMakeLists.txt            |   2 -
 src/parquet/thrift/serializer-test.cc        |  75 -------
 src/parquet/thrift/util.h                    |  11 +-
 src/parquet/util/macros.h                    |   5 +
 src/parquet/util/output.h                    |   4 +
 18 files changed, 320 insertions(+), 190 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89198543/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index f93f232..24d2a20 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -19,9 +19,6 @@ addons:
     - bison                        #needed for thrift cpp compilation
     - flex                         #needed for thrift cpp compilation
     - pkg-config                   #needed for thrift cpp compilation
-cache:
-  directories:
-  - $TRAVIS_BUILD_DIR/parquet-build
 
 matrix:
   include:

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89198543/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index c853993..218e74a 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -231,11 +231,11 @@ set(CXX_FLAGS_RELEASE "-O3 -g")
 string (TOUPPER ${CMAKE_BUILD_TYPE} CMAKE_BUILD_TYPE)
 
 if ("${CMAKE_BUILD_TYPE}" STREQUAL "DEBUG")
-  set(CMAKE_CXX_FLAGS ${CXX_FLAGS_DEBUG})
+  set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CXX_FLAGS_DEBUG}")
 elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "FASTDEBUG")
-  set(CMAKE_CXX_FLAGS ${CXX_FLAGS_FASTDEBUG})
+  set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CXX_FLAGS_FASTDEBUG}")
 elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "RELEASE")
-  set(CMAKE_CXX_FLAGS ${CXX_FLAGS_RELEASE})
+  set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CXX_FLAGS_RELEASE}")
 else()
   message(FATAL_ERROR "Unknown build type: ${CMAKE_BUILD_TYPE}")
 endif ()

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89198543/src/parquet/column/levels-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/levels-test.cc b/src/parquet/column/levels-test.cc
index 62188db..0e3c20f 100644
--- a/src/parquet/column/levels-test.cc
+++ b/src/parquet/column/levels-test.cc
@@ -16,6 +16,7 @@
 // under the License.
 
 #include <cstdint>
+#include <memory>
 #include <vector>
 #include <string>
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89198543/src/parquet/column/page.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/page.h b/src/parquet/column/page.h
index 3308a1c..916fd12 100644
--- a/src/parquet/column/page.h
+++ b/src/parquet/column/page.h
@@ -24,6 +24,7 @@
 
 #include <cstdint>
 #include <memory>
+#include <string>
 
 #include "parquet/types.h"
 
@@ -93,13 +94,26 @@ class DataPage : public Page {
     return definition_level_encoding_;
   }
 
+  // DataPageHeader::statistics::max field, if it was set
+  const uint8_t* max() const {
+    return reinterpret_cast<const uint8_t*>(max_.c_str());
+  }
+
+  // DataPageHeader::statistics::min field, if it was set
+  const uint8_t* min() const {
+    return reinterpret_cast<const uint8_t*>(min_.c_str());
+  }
+
  private:
   int32_t num_values_;
   Encoding::type encoding_;
   Encoding::type definition_level_encoding_;
   Encoding::type repetition_level_encoding_;
 
-  // TODO(wesm): parquet::DataPageHeader.statistics
+  // So max/min can be populated privately
+  friend class SerializedPageReader;
+  std::string max_;
+  std::string min_;
 };
 
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89198543/src/parquet/column/test-util.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/test-util.h b/src/parquet/column/test-util.h
index b346fc2..b12f340 100644
--- a/src/parquet/column/test-util.h
+++ b/src/parquet/column/test-util.h
@@ -32,7 +32,6 @@
 
 // Depended on by SerializedPageReader test utilities for now
 #include "parquet/encodings/plain-encoding.h"
-#include "parquet/thrift/util.h"
 #include "parquet/util/input.h"
 
 namespace parquet_cpp {
@@ -195,34 +194,6 @@ static std::shared_ptr<DataPage> MakeDataPage(const std::vector<T>&
values,
 
 } // namespace test
 
-// Utilities for testing the SerializedPageReader internally
-
-static inline void InitDataPage(const parquet::Statistics& stat,
-    parquet::DataPageHeader& data_page, int32_t nvalues) {
-  data_page.encoding = parquet::Encoding::PLAIN;
-  data_page.definition_level_encoding = parquet::Encoding::RLE;
-  data_page.repetition_level_encoding = parquet::Encoding::RLE;
-  data_page.num_values = nvalues;
-  data_page.__set_statistics(stat);
-}
-
-static inline void InitStats(size_t stat_size, parquet::Statistics& stat) {
-  std::vector<char> stat_buffer;
-  stat_buffer.resize(stat_size);
-  for (int i = 0; i < stat_size; i++) {
-    (reinterpret_cast<uint8_t*>(stat_buffer.data()))[i] = i % 255;
-  }
-  stat.__set_max(std::string(stat_buffer.data(), stat_size));
-}
-
-static inline void InitPageHeader(const parquet::DataPageHeader &data_page,
-    parquet::PageHeader& page_header) {
-  page_header.__set_data_page_header(data_page);
-  page_header.uncompressed_page_size = 0;
-  page_header.compressed_page_size = 0;
-  page_header.type = parquet::PageType::DATA_PAGE;
-}
-
 } // namespace parquet_cpp
 
 #endif // PARQUET_COLUMN_TEST_UTIL_H

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89198543/src/parquet/compression/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/parquet/compression/CMakeLists.txt b/src/parquet/compression/CMakeLists.txt
index 2c0b67c..f0ee110 100644
--- a/src/parquet/compression/CMakeLists.txt
+++ b/src/parquet/compression/CMakeLists.txt
@@ -16,6 +16,7 @@
 # under the License.
 
 add_library(parquet_compression STATIC
+  codec.cc
   lz4-codec.cc
   snappy-codec.cc
   gzip-codec.cc

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89198543/src/parquet/compression/codec.cc
----------------------------------------------------------------------
diff --git a/src/parquet/compression/codec.cc b/src/parquet/compression/codec.cc
new file mode 100644
index 0000000..60d308e
--- /dev/null
+++ b/src/parquet/compression/codec.cc
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <memory>
+
+#include "parquet/compression/codec.h"
+#include "parquet/exception.h"
+#include "parquet/types.h"
+
+namespace parquet_cpp {
+
+std::unique_ptr<Codec> Codec::Create(Compression::type codec_type) {
+  std::unique_ptr<Codec> result;
+  switch (codec_type) {
+    case Compression::UNCOMPRESSED:
+      break;
+    case Compression::SNAPPY:
+      result.reset(new SnappyCodec());
+      break;
+    case Compression::GZIP:
+      result.reset(new GZipCodec());
+      break;
+    case Compression::LZO:
+      ParquetException::NYI("LZO codec not implemented");
+      break;
+    default:
+      ParquetException::NYI("Unrecognized codec");
+      break;
+  }
+  return result;
+}
+
+} // namespace parquet_cpp

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89198543/src/parquet/compression/codec.h
----------------------------------------------------------------------
diff --git a/src/parquet/compression/codec.h b/src/parquet/compression/codec.h
index 8fc4ada..bc73f02 100644
--- a/src/parquet/compression/codec.h
+++ b/src/parquet/compression/codec.h
@@ -19,16 +19,21 @@
 #define PARQUET_COMPRESSION_CODEC_H
 
 #include <cstdint>
+#include <memory>
 
 #include <zlib.h>
 
 #include "parquet/exception.h"
+#include "parquet/types.h"
 
 namespace parquet_cpp {
 
 class Codec {
  public:
   virtual ~Codec() {}
+
+  static std::unique_ptr<Codec> Create(Compression::type codec);
+
   virtual void Decompress(int64_t input_len, const uint8_t* input,
       int64_t output_len, uint8_t* output_buffer) = 0;
 
@@ -80,6 +85,7 @@ class GZipCodec : public Codec {
   };
 
   explicit GZipCodec(Format format = GZIP);
+  virtual ~GZipCodec();
 
   virtual void Decompress(int64_t input_len, const uint8_t* input,
       int64_t output_len, uint8_t* output_buffer);
@@ -109,6 +115,8 @@ class GZipCodec : public Codec {
   // perform the refactoring then
   void InitCompressor();
   void InitDecompressor();
+  void EndCompressor();
+  void EndDecompressor();
   bool compressor_initialized_;
   bool decompressor_initialized_;
 };

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89198543/src/parquet/compression/gzip-codec.cc
----------------------------------------------------------------------
diff --git a/src/parquet/compression/gzip-codec.cc b/src/parquet/compression/gzip-codec.cc
index 6ec2726..f48fdad 100644
--- a/src/parquet/compression/gzip-codec.cc
+++ b/src/parquet/compression/gzip-codec.cc
@@ -15,10 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "parquet/compression/codec.h"
-
 #include <cstring>
 #include <sstream>
+#include <string>
+
+#include "parquet/compression/codec.h"
+#include "parquet/exception.h"
 
 namespace parquet_cpp {
 
@@ -40,7 +42,13 @@ GZipCodec::GZipCodec(Format format) :
     decompressor_initialized_(false) {
 }
 
+GZipCodec::~GZipCodec() {
+  EndCompressor();
+  EndDecompressor();
+}
+
 void GZipCodec::InitCompressor() {
+  EndDecompressor();
   memset(&stream_, 0, sizeof(stream_));
 
   int ret;
@@ -58,12 +66,18 @@ void GZipCodec::InitCompressor() {
   }
 
   compressor_initialized_ = true;
-  decompressor_initialized_ = false;
+}
+
+void GZipCodec::EndCompressor() {
+  if (compressor_initialized_) {
+    (void)deflateEnd(&stream_);
+  }
+  compressor_initialized_ = false;
 }
 
 void GZipCodec::InitDecompressor() {
+  EndCompressor();
   memset(&stream_, 0, sizeof(stream_));
-
   int ret;
 
   // Initialize to run either deflate or zlib/gzip format
@@ -71,11 +85,16 @@ void GZipCodec::InitDecompressor() {
   if ((ret = inflateInit2(&stream_, window_bits)) != Z_OK) {
     throw ParquetException("zlib inflateInit failed: " +  std::string(stream_.msg));
   }
-
-  compressor_initialized_ = false;
   decompressor_initialized_ = true;
 }
 
+void GZipCodec::EndDecompressor() {
+  if (decompressor_initialized_) {
+    (void)inflateEnd(&stream_);
+  }
+  decompressor_initialized_ = false;
+}
+
 void GZipCodec::Decompress(int64_t input_length, const uint8_t* input,
     int64_t output_length, uint8_t* output) {
   if (!decompressor_initialized_) {

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89198543/src/parquet/encodings/plain-encoding-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/plain-encoding-test.cc b/src/parquet/encodings/plain-encoding-test.cc
index b8ef13b..5091dc8 100644
--- a/src/parquet/encodings/plain-encoding-test.cc
+++ b/src/parquet/encodings/plain-encoding-test.cc
@@ -17,11 +17,13 @@
 
 #include <cstdint>
 #include <cstdlib>
+#include <cstring>
 #include <string>
 #include <vector>
 
 #include <gtest/gtest.h>
 
+#include "parquet/schema/descriptor.h"
 #include "parquet/encodings/plain-encoding.h"
 #include "parquet/types.h"
 #include "parquet/schema/types.h"
@@ -80,7 +82,7 @@ class EncodeDecode{
 
   void generate_data() {
     // seed the prng so failure is deterministic
-    random_numbers(num_values_, 0.5, draws_);
+    random_numbers(num_values_, 0, draws_);
   }
 
   void encode_decode(ColumnDescriptor *d) {
@@ -141,7 +143,7 @@ void EncodeDecode<ByteArray, Type::BYTE_ARRAY>::generate_data()
{
   int max_byte_array_len = 12 + sizeof(uint32_t);
   size_t nbytes = num_values_ * max_byte_array_len;
   data_buffer_.resize(nbytes);
-  random_byte_array(num_values_, 0.5, data_buffer_.data(), draws_,
+  random_byte_array(num_values_, 0, data_buffer_.data(), draws_,
       max_byte_array_len);
 }
 
@@ -160,7 +162,7 @@ void EncodeDecode<FLBA, Type::FIXED_LEN_BYTE_ARRAY>::generate_data()
{
   size_t nbytes = num_values_ * flba_length;
   data_buffer_.resize(nbytes);
   ASSERT_EQ(nbytes, data_buffer_.size());
-  random_fixed_byte_array(num_values_, 0.5, data_buffer_.data(), flba_length, draws_);
+  random_fixed_byte_array(num_values_, 0, data_buffer_.data(), flba_length, draws_);
 }
 
 template<>

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89198543/src/parquet/file/file-deserialize-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/file-deserialize-test.cc b/src/parquet/file/file-deserialize-test.cc
index e90889d..cfb3e86 100644
--- a/src/parquet/file/file-deserialize-test.cc
+++ b/src/parquet/file/file-deserialize-test.cc
@@ -20,92 +20,224 @@
 #include <algorithm>
 #include <cstdlib>
 #include <cstdint>
+#include <cstring>
 #include <exception>
 #include <memory>
 #include <string>
+#include <vector>
 
 #include "parquet/column/page.h"
-#include "parquet/column/test-util.h"
-
+#include "parquet/compression/codec.h"
+#include "parquet/exception.h"
 #include "parquet/file/reader-internal.h"
 #include "parquet/thrift/parquet_types.h"
 #include "parquet/thrift/util.h"
 #include "parquet/types.h"
 #include "parquet/util/input.h"
+#include "parquet/util/output.h"
+#include "parquet/util/test-common.h"
 
 namespace parquet_cpp {
 
-class TestSerializedPage : public ::testing::Test {
+
+// Adds page statistics occupying a certain amount of bytes (for testing very
+// large page headers)
+static inline void AddDummyStats(size_t stat_size,
+    parquet::DataPageHeader& data_page) {
+
+  std::vector<uint8_t> stat_bytes(stat_size);
+  // Some non-zero value
+  std::fill(stat_bytes.begin(), stat_bytes.end(), 1);
+  data_page.statistics.__set_max(std::string(
+          reinterpret_cast<const char*>(stat_bytes.data()), stat_size));
+  data_page.__isset.statistics = true;
+}
+
+class TestPageSerde : public ::testing::Test {
  public:
-  void InitSerializedPageReader(const uint8_t* buffer, size_t header_size,
-      Compression::type codec) {
+  void SetUp() {
+    data_page_header_.encoding = parquet::Encoding::PLAIN;
+    data_page_header_.definition_level_encoding = parquet::Encoding::RLE;
+    data_page_header_.repetition_level_encoding = parquet::Encoding::RLE;
+
+    ResetStream();
+  }
+
+  void InitSerializedPageReader(Compression::type codec =
+      Compression::UNCOMPRESSED) {
+    EndStream();
     std::unique_ptr<InputStream> stream;
-    stream.reset(new InMemoryInputStream(buffer, header_size));
+    stream.reset(new InMemoryInputStream(out_buffer_.data(),
+            out_buffer_.size()));
     page_reader_.reset(new SerializedPageReader(std::move(stream), codec));
   }
 
+  void WriteDataPageHeader(int max_serialized_len = 1024,
+      int32_t uncompressed_size = 0, int32_t compressed_size = 0) {
+    // Simplifying writing serialized data page headers which may or may not
+    // have meaningful data associated with them
+
+    // Serialize the Page header
+    uint32_t serialized_len = max_serialized_len;
+    page_header_.__set_data_page_header(data_page_header_);
+    page_header_.uncompressed_page_size = uncompressed_size;
+    page_header_.compressed_page_size = compressed_size;
+    page_header_.type = parquet::PageType::DATA_PAGE;
+
+    ASSERT_NO_THROW(SerializeThriftMsg(&page_header_, max_serialized_len,
+          out_stream_.get()));
+  }
+
+  void ResetStream() {
+    out_buffer_.resize(0);
+    out_stream_.reset(new InMemoryOutputStream());
+  }
+
+  void EndStream() {
+    out_stream_->Transfer(&out_buffer_);
+  }
+
  protected:
+  std::unique_ptr<InMemoryOutputStream> out_stream_;
+
+  // TODO(wesm): Owns the results of the output stream. To be refactored
+  std::vector<uint8_t> out_buffer_;
+
   std::unique_ptr<SerializedPageReader> page_reader_;
+  parquet::PageHeader page_header_;
+  parquet::DataPageHeader data_page_header_;
 };
 
-TEST_F(TestSerializedPage, TestLargePageHeaders) {
-  parquet::PageHeader in_page_header;
-  parquet::DataPageHeader data_page_header;
+void CheckDataPageHeader(const parquet::DataPageHeader expected,
+    const Page* page) {
+  ASSERT_EQ(PageType::DATA_PAGE, page->type());
+
+  const DataPage* data_page = static_cast<const DataPage*>(page);
+  ASSERT_EQ(expected.num_values, data_page->num_values());
+  ASSERT_EQ(expected.encoding, data_page->encoding());
+  ASSERT_EQ(expected.definition_level_encoding,
+      data_page->definition_level_encoding());
+  ASSERT_EQ(expected.repetition_level_encoding,
+      data_page->repetition_level_encoding());
+
+  if (expected.statistics.__isset.max) {
+    ASSERT_EQ(0, memcmp(expected.statistics.max.c_str(),
+            data_page->max(), expected.statistics.max.length()));
+  }
+  if (expected.statistics.__isset.min) {
+    ASSERT_EQ(0, memcmp(expected.statistics.min.c_str(),
+            data_page->min(), expected.statistics.min.length()));
+  }
+}
+
+TEST_F(TestPageSerde, DataPage) {
   parquet::PageHeader out_page_header;
-  parquet::Statistics stats;
-  int expected_header_size = 512 * 1024; //512 KB
+
+  int stats_size = 512;
+  AddDummyStats(stats_size, data_page_header_);
+  data_page_header_.num_values = 4444;
+
+  WriteDataPageHeader();
+  InitSerializedPageReader();
+  std::shared_ptr<Page> current_page = page_reader_->NextPage();
+  CheckDataPageHeader(data_page_header_, current_page.get());
+}
+
+TEST_F(TestPageSerde, TestLargePageHeaders) {
   int stats_size = 256 * 1024; // 256 KB
-  std::string serialized_buffer;
-  int num_values = 4141;
+  AddDummyStats(stats_size, data_page_header_);
 
-  InitStats(stats_size, stats);
-  InitDataPage(stats, data_page_header, num_values);
-  InitPageHeader(data_page_header, in_page_header);
+  // Any number to verify metadata roundtrip
+  data_page_header_.num_values = 4141;
 
-  // Serialize the Page header
-  ASSERT_NO_THROW(serialized_buffer = SerializeThriftMsg(&in_page_header,
-      expected_header_size));
-  // check header size is between 256 KB to 16 MB
-  ASSERT_LE(stats_size, serialized_buffer.length());
-  ASSERT_GE(DEFAULT_MAX_PAGE_HEADER_SIZE, serialized_buffer.length());
+  int max_header_size = 512 * 1024; // 512 KB
+  WriteDataPageHeader(max_header_size);
+  ASSERT_GE(max_header_size, out_stream_->Tell());
 
-  InitSerializedPageReader(reinterpret_cast<const uint8_t*>(serialized_buffer.c_str()),
-      serialized_buffer.length(), Compression::UNCOMPRESSED);
+  // check header size is between 256 KB to 16 MB
+  ASSERT_LE(stats_size, out_stream_->Tell());
+  ASSERT_GE(DEFAULT_MAX_PAGE_HEADER_SIZE, out_stream_->Tell());
 
+  InitSerializedPageReader();
   std::shared_ptr<Page> current_page = page_reader_->NextPage();
-  ASSERT_EQ(PageType::DATA_PAGE, current_page->type());
-  const DataPage* page = static_cast<const DataPage*>(current_page.get());
-  ASSERT_EQ(num_values, page->num_values());
+  CheckDataPageHeader(data_page_header_, current_page.get());
 }
 
-TEST_F(TestSerializedPage, TestFailLargePageHeaders) {
-  parquet::PageHeader in_page_header;
-  parquet::DataPageHeader data_page_header;
-  parquet::PageHeader out_page_header;
-  parquet::Statistics stats;
-  int expected_header_size = 512 * 1024; // 512 KB
+TEST_F(TestPageSerde, TestFailLargePageHeaders) {
   int stats_size = 256 * 1024; // 256 KB
-  int max_header_size = 128 * 1024; // 128 KB
-  int num_values = 4141;
-  std::string serialized_buffer;
-
-  InitStats(stats_size, stats);
-  InitDataPage(stats, data_page_header, num_values);
-  InitPageHeader(data_page_header, in_page_header);
+  AddDummyStats(stats_size, data_page_header_);
 
   // Serialize the Page header
-  ASSERT_NO_THROW(serialized_buffer = SerializeThriftMsg(&in_page_header,
-      expected_header_size));
-  // check header size is between 256 KB to 16 MB
-  ASSERT_LE(stats_size, serialized_buffer.length());
-  ASSERT_GE(DEFAULT_MAX_PAGE_HEADER_SIZE, serialized_buffer.length());
+  int max_header_size = 512 * 1024; // 512 KB
+  WriteDataPageHeader(max_header_size);
+  ASSERT_GE(max_header_size, out_stream_->Tell());
 
-  InitSerializedPageReader(reinterpret_cast<const uint8_t*>(serialized_buffer.c_str()),
-      serialized_buffer.length(), Compression::UNCOMPRESSED);
-
-  // Set the max page header size to 128 KB, which is less than the current header size
-  page_reader_->set_max_page_header_size(max_header_size);
+  int smaller_max_size = 128 * 1024;
+  ASSERT_LE(smaller_max_size, out_stream_->Tell());
+  InitSerializedPageReader();
 
+  // Set the max page header size to 128 KB, which is less than the current
+  // header size
+  page_reader_->set_max_page_header_size(smaller_max_size);
   ASSERT_THROW(page_reader_->NextPage(), ParquetException);
 }
+
+TEST_F(TestPageSerde, Compression) {
+  Compression::type codec_types[2] = {Compression::GZIP, Compression::SNAPPY};
+
+  // This is a dummy number
+  data_page_header_.num_values = 32;
+
+  int num_pages = 10;
+
+  std::vector<std::vector<uint8_t> > faux_data;
+  faux_data.resize(num_pages);
+  for (int i = 0; i < num_pages; ++i) {
+    // The pages keep getting larger
+    int page_size = (i + 1) * 64;
+    test::random_bytes(page_size, 0, &faux_data[i]);
+  }
+  for (auto codec_type : codec_types) {
+    std::unique_ptr<Codec> codec = Codec::Create(codec_type);
+
+    std::vector<uint8_t> buffer;
+    for (int i = 0; i < num_pages; ++i) {
+      const uint8_t* data = faux_data[i].data();
+      size_t data_size = faux_data[i].size();
+
+      int64_t max_compressed_size = codec->MaxCompressedLen(data_size, data);
+      buffer.resize(max_compressed_size);
+
+      int64_t actual_size = codec->Compress(data_size, data,
+          max_compressed_size, &buffer[0]);
+
+      WriteDataPageHeader(1024, data_size, actual_size);
+      out_stream_->Write(buffer.data(), actual_size);
+    }
+
+    InitSerializedPageReader(codec_type);
+
+    std::shared_ptr<Page> page;
+    const DataPage* data_page;
+    for (int i = 0; i < num_pages; ++i) {
+      size_t data_size = faux_data[i].size();
+      page = page_reader_->NextPage();
+      data_page = static_cast<const DataPage*>(page.get());
+      ASSERT_EQ(data_size, data_page->size());
+      ASSERT_EQ(0, memcmp(faux_data[i].data(), data_page->data(), data_size));
+    }
+
+    ResetStream();
+  }
+}
+
+TEST_F(TestPageSerde, LZONotSupported) {
+  // Must await PARQUET-530
+  int data_size = 1024;
+  std::vector<uint8_t> faux_data(data_size);
+  WriteDataPageHeader(1024, data_size, data_size);
+  out_stream_->Write(faux_data.data(), data_size);
+  ASSERT_THROW(InitSerializedPageReader(Compression::LZO), ParquetException);
+}
+
 } // namespace parquet_cpp

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89198543/src/parquet/file/reader-internal.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader-internal.cc b/src/parquet/file/reader-internal.cc
index 47092a5..0a93b00 100644
--- a/src/parquet/file/reader-internal.cc
+++ b/src/parquet/file/reader-internal.cc
@@ -21,6 +21,7 @@
 #include <algorithm>
 #include <exception>
 #include <ostream>
+#include <string>
 #include <vector>
 
 #include "parquet/column/page.h"
@@ -40,22 +41,10 @@ namespace parquet_cpp {
 // assembled in a serialized stream for storing in a Parquet files
 
 SerializedPageReader::SerializedPageReader(std::unique_ptr<InputStream> stream,
-    Compression::type codec) :
+    Compression::type codec_type) :
     stream_(std::move(stream)) {
   max_page_header_size_ = DEFAULT_MAX_PAGE_HEADER_SIZE;
-  // TODO(wesm): add GZIP after PARQUET-456
-  switch (codec) {
-    case Compression::UNCOMPRESSED:
-      break;
-    case Compression::SNAPPY:
-      decompressor_.reset(new SnappyCodec());
-      break;
-    case Compression::LZO:
-      decompressor_.reset(new Lz4Codec());
-      break;
-    default:
-      ParquetException::NYI("Reading compressed data");
-  }
+  decompressor_ = Codec::Create(codec_type);
 }
 
 std::shared_ptr<Page> SerializedPageReader::NextPage() {
@@ -126,11 +115,22 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
     } else if (current_page_header_.type == parquet::PageType::DATA_PAGE) {
       const parquet::DataPageHeader& header = current_page_header_.data_page_header;
 
-      return std::make_shared<DataPage>(buffer, uncompressed_len,
+      auto page = std::make_shared<DataPage>(buffer, uncompressed_len,
           header.num_values,
           FromThrift(header.encoding),
           FromThrift(header.definition_level_encoding),
           FromThrift(header.repetition_level_encoding));
+
+      if (header.__isset.statistics) {
+        const parquet::Statistics stats = header.statistics;
+        if (stats.__isset.max) {
+          page->max_ = stats.max;
+        }
+        if (stats.__isset.min) {
+          page->min_ = stats.min;
+        }
+      }
+      return page;
     } else if (current_page_header_.type == parquet::PageType::DATA_PAGE_V2) {
       const parquet::DataPageHeaderV2& header = current_page_header_.data_page_header_v2;
       bool is_compressed = header.__isset.is_compressed? header.is_compressed : false;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89198543/src/parquet/schema/schema-descriptor-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/schema/schema-descriptor-test.cc b/src/parquet/schema/schema-descriptor-test.cc
index c63df54..83b136d 100644
--- a/src/parquet/schema/schema-descriptor-test.cc
+++ b/src/parquet/schema/schema-descriptor-test.cc
@@ -27,6 +27,7 @@
 #include "parquet/exception.h"
 #include "parquet/schema/descriptor.h"
 #include "parquet/schema/types.h"
+#include "parquet/types.h"
 
 using std::string;
 using std::vector;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89198543/src/parquet/thrift/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/parquet/thrift/CMakeLists.txt b/src/parquet/thrift/CMakeLists.txt
index 29b8ef8..f43c2a5 100644
--- a/src/parquet/thrift/CMakeLists.txt
+++ b/src/parquet/thrift/CMakeLists.txt
@@ -44,5 +44,3 @@ add_custom_command(
   COMMENT "Running thrift compiler on parquet.thrift"
   VERBATIM
 )
-
-ADD_PARQUET_TEST(serializer-test)

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89198543/src/parquet/thrift/serializer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/thrift/serializer-test.cc b/src/parquet/thrift/serializer-test.cc
deleted file mode 100644
index 756fd10..0000000
--- a/src/parquet/thrift/serializer-test.cc
+++ /dev/null
@@ -1,75 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <gtest/gtest.h>
-
-#include <cstdint>
-#include <exception>
-#include <string>
-
-#include "parquet/column/test-util.h"
-#include "parquet/thrift/parquet_types.h"
-#include "parquet/thrift/util.h"
-
-using std::string;
-
-namespace parquet_cpp {
-
-class TestThrift : public ::testing::Test {
-
-};
-
-TEST_F(TestThrift, TestSerializerDeserializer) {
-  parquet::PageHeader in_page_header;
-  parquet::DataPageHeader data_page_header;
-  parquet::PageHeader out_page_header;
-  parquet::Statistics stats;
-  uint32_t max_header_len = 1024;
-  uint32_t expected_header_size = 1024;
-  uint32_t stats_size = 512;
-  std::string serialized_buffer;
-  int num_values = 4444;
-
-  InitStats(stats_size, stats);
-  InitDataPage(stats, data_page_header, num_values);
-  InitPageHeader(data_page_header, in_page_header);
-
-  // Serialize the Page header
-  ASSERT_NO_THROW(serialized_buffer = SerializeThriftMsg(&in_page_header, expected_header_size));
-  ASSERT_LE(stats_size, serialized_buffer.length());
-  ASSERT_GE(max_header_len, serialized_buffer.length());
-
-  uint32_t header_size = 1024;
-  // Deserialize the serialized page buffer
-  ASSERT_NO_THROW(DeserializeThriftMsg(reinterpret_cast<const uint8_t*>(serialized_buffer.c_str()),
-      &header_size, &out_page_header));
-  ASSERT_LE(stats_size, header_size);
-  ASSERT_GE(max_header_len, header_size);
-
-  ASSERT_EQ(parquet::Encoding::PLAIN, out_page_header.data_page_header.encoding);
-  ASSERT_EQ(parquet::Encoding::RLE, out_page_header.data_page_header.definition_level_encoding);
-  ASSERT_EQ(parquet::Encoding::RLE, out_page_header.data_page_header.repetition_level_encoding);
-  for(int i = 0; i < stats_size; i++){
-    EXPECT_EQ(i % 255, (reinterpret_cast<const uint8_t*>
-        (out_page_header.data_page_header.statistics.max.c_str()))[i]);
-  }
-  ASSERT_EQ(parquet::PageType::DATA_PAGE, out_page_header.type);
-  ASSERT_EQ(num_values, out_page_header.data_page_header.num_values);
-
-}
-
-} // namespace parquet_cpp

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89198543/src/parquet/thrift/util.h
----------------------------------------------------------------------
diff --git a/src/parquet/thrift/util.h b/src/parquet/thrift/util.h
index 8c34197..5f29820 100644
--- a/src/parquet/thrift/util.h
+++ b/src/parquet/thrift/util.h
@@ -18,8 +18,9 @@
 #include <sstream>
 
 #include "parquet/exception.h"
-#include "parquet/util/logging.h"
 #include "parquet/thrift/parquet_types.h"
+#include "parquet/util/logging.h"
+#include "parquet/util/output.h"
 
 namespace parquet_cpp {
 
@@ -77,7 +78,7 @@ inline void DeserializeThriftMsg(const uint8_t* buf, uint32_t* len, T* deseriali
 // The arguments are the object to be serialized and
 // the expected size of the serialized object
 template <class T>
-inline std::string SerializeThriftMsg(T* obj, uint32_t len) {
+inline void SerializeThriftMsg(T* obj, uint32_t len, OutputStream* out) {
   boost::shared_ptr<apache::thrift::transport::TMemoryBuffer> mem_buffer(
       new apache::thrift::transport::TMemoryBuffer(len));
   apache::thrift::protocol::TCompactProtocolFactoryT<
@@ -92,7 +93,11 @@ inline std::string SerializeThriftMsg(T* obj, uint32_t len) {
     ss << "Couldn't serialize thrift: " << e.what() << "\n";
     throw ParquetException(ss.str());
   }
-  return mem_buffer->getBufferAsString();
+
+  uint8_t* out_buffer;
+  uint32_t out_length;
+  mem_buffer->getBuffer(&out_buffer, &out_length);
+  out->Write(out_buffer, out_length);
 }
 
 } // namespace parquet_cpp

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89198543/src/parquet/util/macros.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/macros.h b/src/parquet/util/macros.h
index 7b301d6..d221173 100644
--- a/src/parquet/util/macros.h
+++ b/src/parquet/util/macros.h
@@ -20,6 +20,11 @@
 
 // Useful macros from elsewhere
 
+// From Google gutil
+#define DISALLOW_COPY_AND_ASSIGN(TypeName)      \
+  TypeName(const TypeName&) = delete;           \
+  void operator=(const TypeName&) = delete
+
 // ----------------------------------------------------------------------
 // From googletest
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89198543/src/parquet/util/output.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/output.h b/src/parquet/util/output.h
index be25abd..68a09e2 100644
--- a/src/parquet/util/output.h
+++ b/src/parquet/util/output.h
@@ -21,6 +21,8 @@
 #include <cstdint>
 #include <vector>
 
+#include "parquet/util/macros.h"
+
 namespace parquet_cpp {
 
 // ----------------------------------------------------------------------
@@ -63,6 +65,8 @@ class InMemoryOutputStream : public OutputStream {
   std::vector<uint8_t> buffer_;
   int64_t size_;
   int64_t capacity_;
+
+  DISALLOW_COPY_AND_ASSIGN(InMemoryOutputStream);
 };
 
 } // namespace parquet_cpp


Mime
View raw message