Return-Path: X-Original-To: apmail-parquet-commits-archive@minotaur.apache.org Delivered-To: apmail-parquet-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 516C31863A for ; Sun, 21 Feb 2016 00:12:00 +0000 (UTC) Received: (qmail 88119 invoked by uid 500); 21 Feb 2016 00:12:00 -0000 Delivered-To: apmail-parquet-commits-archive@parquet.apache.org Received: (qmail 88078 invoked by uid 500); 21 Feb 2016 00:12:00 -0000 Mailing-List: contact commits-help@parquet.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@parquet.apache.org Delivered-To: mailing list commits@parquet.apache.org Received: (qmail 88069 invoked by uid 99); 21 Feb 2016 00:12:00 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 21 Feb 2016 00:12:00 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B4A5FE0104; Sun, 21 Feb 2016 00:11:59 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: julien@apache.org To: commits@parquet.apache.org Message-Id: <10fa0acb8f8145369ce4169c7f34eb79@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: parquet-cpp git commit: PARQUET-457: Verify page deserialization for GZIP and SNAPPY codecs, related refactoring Date: Sun, 21 Feb 2016 00:11:59 +0000 (UTC) Repository: parquet-cpp Updated Branches: refs/heads/master 9cab887f2 -> 891985439 PARQUET-457: Verify page deserialization for GZIP and SNAPPY codecs, related refactoring This also restores passing on user's `CMAKE_CXX_FLAGS`, which had unfortunately led some compiler warnings to creep into our build. Author: Wes McKinney Closes #58 from wesm/PARQUET-457 and squashes the following commits: 4bf12ed [Wes McKinney] * SerializeThriftMsg now writes into an OutputStream. * Refactor page serialization in advance of compression tests * Test compression roundtrip on random bytes for snappy and gzip * Trying LZO compression results in ParquetException * Don't lose user's CMAKE_CXX_FLAGS * Remove Travis CI directory caching for now * Fix gzip memory leak if you do not call inflateEnd, deflateEnd Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/89198543 Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/89198543 Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/89198543 Branch: refs/heads/master Commit: 89198543987ea8830501f35ba11581bf3a1b5a03 Parents: 9cab887 Author: Wes McKinney Authored: Sat Feb 20 16:11:41 2016 -0800 Committer: Julien Le Dem Committed: Sat Feb 20 16:11:41 2016 -0800 ---------------------------------------------------------------------- .travis.yml | 3 - CMakeLists.txt | 6 +- src/parquet/column/levels-test.cc | 1 + src/parquet/column/page.h | 16 +- src/parquet/column/test-util.h | 29 --- src/parquet/compression/CMakeLists.txt | 1 + src/parquet/compression/codec.cc | 47 +++++ src/parquet/compression/codec.h | 8 + src/parquet/compression/gzip-codec.cc | 31 ++- src/parquet/encodings/plain-encoding-test.cc | 8 +- src/parquet/file/file-deserialize-test.cc | 232 +++++++++++++++++----- src/parquet/file/reader-internal.cc | 30 +-- src/parquet/schema/schema-descriptor-test.cc | 1 + src/parquet/thrift/CMakeLists.txt | 2 - src/parquet/thrift/serializer-test.cc | 75 ------- src/parquet/thrift/util.h | 11 +- src/parquet/util/macros.h | 5 + src/parquet/util/output.h | 4 + 18 files changed, 320 insertions(+), 190 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89198543/.travis.yml ---------------------------------------------------------------------- diff --git a/.travis.yml b/.travis.yml index f93f232..24d2a20 100644 --- a/.travis.yml +++ b/.travis.yml @@ -19,9 +19,6 @@ addons: - bison #needed for thrift cpp compilation - flex #needed for thrift cpp compilation - pkg-config #needed for thrift cpp compilation -cache: - directories: - - $TRAVIS_BUILD_DIR/parquet-build matrix: include: http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89198543/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/CMakeLists.txt b/CMakeLists.txt index c853993..218e74a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -231,11 +231,11 @@ set(CXX_FLAGS_RELEASE "-O3 -g") string (TOUPPER ${CMAKE_BUILD_TYPE} CMAKE_BUILD_TYPE) if ("${CMAKE_BUILD_TYPE}" STREQUAL "DEBUG") - set(CMAKE_CXX_FLAGS ${CXX_FLAGS_DEBUG}) + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CXX_FLAGS_DEBUG}") elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "FASTDEBUG") - set(CMAKE_CXX_FLAGS ${CXX_FLAGS_FASTDEBUG}) + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CXX_FLAGS_FASTDEBUG}") elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "RELEASE") - set(CMAKE_CXX_FLAGS ${CXX_FLAGS_RELEASE}) + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CXX_FLAGS_RELEASE}") else() message(FATAL_ERROR "Unknown build type: ${CMAKE_BUILD_TYPE}") endif () http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89198543/src/parquet/column/levels-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/levels-test.cc b/src/parquet/column/levels-test.cc index 62188db..0e3c20f 100644 --- a/src/parquet/column/levels-test.cc +++ b/src/parquet/column/levels-test.cc @@ -16,6 +16,7 @@ // under the License. #include +#include #include #include http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89198543/src/parquet/column/page.h ---------------------------------------------------------------------- diff --git a/src/parquet/column/page.h b/src/parquet/column/page.h index 3308a1c..916fd12 100644 --- a/src/parquet/column/page.h +++ b/src/parquet/column/page.h @@ -24,6 +24,7 @@ #include #include +#include #include "parquet/types.h" @@ -93,13 +94,26 @@ class DataPage : public Page { return definition_level_encoding_; } + // DataPageHeader::statistics::max field, if it was set + const uint8_t* max() const { + return reinterpret_cast(max_.c_str()); + } + + // DataPageHeader::statistics::min field, if it was set + const uint8_t* min() const { + return reinterpret_cast(min_.c_str()); + } + private: int32_t num_values_; Encoding::type encoding_; Encoding::type definition_level_encoding_; Encoding::type repetition_level_encoding_; - // TODO(wesm): parquet::DataPageHeader.statistics + // So max/min can be populated privately + friend class SerializedPageReader; + std::string max_; + std::string min_; }; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89198543/src/parquet/column/test-util.h ---------------------------------------------------------------------- diff --git a/src/parquet/column/test-util.h b/src/parquet/column/test-util.h index b346fc2..b12f340 100644 --- a/src/parquet/column/test-util.h +++ b/src/parquet/column/test-util.h @@ -32,7 +32,6 @@ // Depended on by SerializedPageReader test utilities for now #include "parquet/encodings/plain-encoding.h" -#include "parquet/thrift/util.h" #include "parquet/util/input.h" namespace parquet_cpp { @@ -195,34 +194,6 @@ static std::shared_ptr MakeDataPage(const std::vector& values, } // namespace test -// Utilities for testing the SerializedPageReader internally - -static inline void InitDataPage(const parquet::Statistics& stat, - parquet::DataPageHeader& data_page, int32_t nvalues) { - data_page.encoding = parquet::Encoding::PLAIN; - data_page.definition_level_encoding = parquet::Encoding::RLE; - data_page.repetition_level_encoding = parquet::Encoding::RLE; - data_page.num_values = nvalues; - data_page.__set_statistics(stat); -} - -static inline void InitStats(size_t stat_size, parquet::Statistics& stat) { - std::vector stat_buffer; - stat_buffer.resize(stat_size); - for (int i = 0; i < stat_size; i++) { - (reinterpret_cast(stat_buffer.data()))[i] = i % 255; - } - stat.__set_max(std::string(stat_buffer.data(), stat_size)); -} - -static inline void InitPageHeader(const parquet::DataPageHeader &data_page, - parquet::PageHeader& page_header) { - page_header.__set_data_page_header(data_page); - page_header.uncompressed_page_size = 0; - page_header.compressed_page_size = 0; - page_header.type = parquet::PageType::DATA_PAGE; -} - } // namespace parquet_cpp #endif // PARQUET_COLUMN_TEST_UTIL_H http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89198543/src/parquet/compression/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/parquet/compression/CMakeLists.txt b/src/parquet/compression/CMakeLists.txt index 2c0b67c..f0ee110 100644 --- a/src/parquet/compression/CMakeLists.txt +++ b/src/parquet/compression/CMakeLists.txt @@ -16,6 +16,7 @@ # under the License. add_library(parquet_compression STATIC + codec.cc lz4-codec.cc snappy-codec.cc gzip-codec.cc http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89198543/src/parquet/compression/codec.cc ---------------------------------------------------------------------- diff --git a/src/parquet/compression/codec.cc b/src/parquet/compression/codec.cc new file mode 100644 index 0000000..60d308e --- /dev/null +++ b/src/parquet/compression/codec.cc @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include "parquet/compression/codec.h" +#include "parquet/exception.h" +#include "parquet/types.h" + +namespace parquet_cpp { + +std::unique_ptr Codec::Create(Compression::type codec_type) { + std::unique_ptr result; + switch (codec_type) { + case Compression::UNCOMPRESSED: + break; + case Compression::SNAPPY: + result.reset(new SnappyCodec()); + break; + case Compression::GZIP: + result.reset(new GZipCodec()); + break; + case Compression::LZO: + ParquetException::NYI("LZO codec not implemented"); + break; + default: + ParquetException::NYI("Unrecognized codec"); + break; + } + return result; +} + +} // namespace parquet_cpp http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89198543/src/parquet/compression/codec.h ---------------------------------------------------------------------- diff --git a/src/parquet/compression/codec.h b/src/parquet/compression/codec.h index 8fc4ada..bc73f02 100644 --- a/src/parquet/compression/codec.h +++ b/src/parquet/compression/codec.h @@ -19,16 +19,21 @@ #define PARQUET_COMPRESSION_CODEC_H #include +#include #include #include "parquet/exception.h" +#include "parquet/types.h" namespace parquet_cpp { class Codec { public: virtual ~Codec() {} + + static std::unique_ptr Create(Compression::type codec); + virtual void Decompress(int64_t input_len, const uint8_t* input, int64_t output_len, uint8_t* output_buffer) = 0; @@ -80,6 +85,7 @@ class GZipCodec : public Codec { }; explicit GZipCodec(Format format = GZIP); + virtual ~GZipCodec(); virtual void Decompress(int64_t input_len, const uint8_t* input, int64_t output_len, uint8_t* output_buffer); @@ -109,6 +115,8 @@ class GZipCodec : public Codec { // perform the refactoring then void InitCompressor(); void InitDecompressor(); + void EndCompressor(); + void EndDecompressor(); bool compressor_initialized_; bool decompressor_initialized_; }; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89198543/src/parquet/compression/gzip-codec.cc ---------------------------------------------------------------------- diff --git a/src/parquet/compression/gzip-codec.cc b/src/parquet/compression/gzip-codec.cc index 6ec2726..f48fdad 100644 --- a/src/parquet/compression/gzip-codec.cc +++ b/src/parquet/compression/gzip-codec.cc @@ -15,10 +15,12 @@ // specific language governing permissions and limitations // under the License. -#include "parquet/compression/codec.h" - #include #include +#include + +#include "parquet/compression/codec.h" +#include "parquet/exception.h" namespace parquet_cpp { @@ -40,7 +42,13 @@ GZipCodec::GZipCodec(Format format) : decompressor_initialized_(false) { } +GZipCodec::~GZipCodec() { + EndCompressor(); + EndDecompressor(); +} + void GZipCodec::InitCompressor() { + EndDecompressor(); memset(&stream_, 0, sizeof(stream_)); int ret; @@ -58,12 +66,18 @@ void GZipCodec::InitCompressor() { } compressor_initialized_ = true; - decompressor_initialized_ = false; +} + +void GZipCodec::EndCompressor() { + if (compressor_initialized_) { + (void)deflateEnd(&stream_); + } + compressor_initialized_ = false; } void GZipCodec::InitDecompressor() { + EndCompressor(); memset(&stream_, 0, sizeof(stream_)); - int ret; // Initialize to run either deflate or zlib/gzip format @@ -71,11 +85,16 @@ void GZipCodec::InitDecompressor() { if ((ret = inflateInit2(&stream_, window_bits)) != Z_OK) { throw ParquetException("zlib inflateInit failed: " + std::string(stream_.msg)); } - - compressor_initialized_ = false; decompressor_initialized_ = true; } +void GZipCodec::EndDecompressor() { + if (decompressor_initialized_) { + (void)inflateEnd(&stream_); + } + decompressor_initialized_ = false; +} + void GZipCodec::Decompress(int64_t input_length, const uint8_t* input, int64_t output_length, uint8_t* output) { if (!decompressor_initialized_) { http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89198543/src/parquet/encodings/plain-encoding-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/plain-encoding-test.cc b/src/parquet/encodings/plain-encoding-test.cc index b8ef13b..5091dc8 100644 --- a/src/parquet/encodings/plain-encoding-test.cc +++ b/src/parquet/encodings/plain-encoding-test.cc @@ -17,11 +17,13 @@ #include #include +#include #include #include #include +#include "parquet/schema/descriptor.h" #include "parquet/encodings/plain-encoding.h" #include "parquet/types.h" #include "parquet/schema/types.h" @@ -80,7 +82,7 @@ class EncodeDecode{ void generate_data() { // seed the prng so failure is deterministic - random_numbers(num_values_, 0.5, draws_); + random_numbers(num_values_, 0, draws_); } void encode_decode(ColumnDescriptor *d) { @@ -141,7 +143,7 @@ void EncodeDecode::generate_data() { int max_byte_array_len = 12 + sizeof(uint32_t); size_t nbytes = num_values_ * max_byte_array_len; data_buffer_.resize(nbytes); - random_byte_array(num_values_, 0.5, data_buffer_.data(), draws_, + random_byte_array(num_values_, 0, data_buffer_.data(), draws_, max_byte_array_len); } @@ -160,7 +162,7 @@ void EncodeDecode::generate_data() { size_t nbytes = num_values_ * flba_length; data_buffer_.resize(nbytes); ASSERT_EQ(nbytes, data_buffer_.size()); - random_fixed_byte_array(num_values_, 0.5, data_buffer_.data(), flba_length, draws_); + random_fixed_byte_array(num_values_, 0, data_buffer_.data(), flba_length, draws_); } template<> http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89198543/src/parquet/file/file-deserialize-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/file/file-deserialize-test.cc b/src/parquet/file/file-deserialize-test.cc index e90889d..cfb3e86 100644 --- a/src/parquet/file/file-deserialize-test.cc +++ b/src/parquet/file/file-deserialize-test.cc @@ -20,92 +20,224 @@ #include #include #include +#include #include #include #include +#include #include "parquet/column/page.h" -#include "parquet/column/test-util.h" - +#include "parquet/compression/codec.h" +#include "parquet/exception.h" #include "parquet/file/reader-internal.h" #include "parquet/thrift/parquet_types.h" #include "parquet/thrift/util.h" #include "parquet/types.h" #include "parquet/util/input.h" +#include "parquet/util/output.h" +#include "parquet/util/test-common.h" namespace parquet_cpp { -class TestSerializedPage : public ::testing::Test { + +// Adds page statistics occupying a certain amount of bytes (for testing very +// large page headers) +static inline void AddDummyStats(size_t stat_size, + parquet::DataPageHeader& data_page) { + + std::vector stat_bytes(stat_size); + // Some non-zero value + std::fill(stat_bytes.begin(), stat_bytes.end(), 1); + data_page.statistics.__set_max(std::string( + reinterpret_cast(stat_bytes.data()), stat_size)); + data_page.__isset.statistics = true; +} + +class TestPageSerde : public ::testing::Test { public: - void InitSerializedPageReader(const uint8_t* buffer, size_t header_size, - Compression::type codec) { + void SetUp() { + data_page_header_.encoding = parquet::Encoding::PLAIN; + data_page_header_.definition_level_encoding = parquet::Encoding::RLE; + data_page_header_.repetition_level_encoding = parquet::Encoding::RLE; + + ResetStream(); + } + + void InitSerializedPageReader(Compression::type codec = + Compression::UNCOMPRESSED) { + EndStream(); std::unique_ptr stream; - stream.reset(new InMemoryInputStream(buffer, header_size)); + stream.reset(new InMemoryInputStream(out_buffer_.data(), + out_buffer_.size())); page_reader_.reset(new SerializedPageReader(std::move(stream), codec)); } + void WriteDataPageHeader(int max_serialized_len = 1024, + int32_t uncompressed_size = 0, int32_t compressed_size = 0) { + // Simplifying writing serialized data page headers which may or may not + // have meaningful data associated with them + + // Serialize the Page header + uint32_t serialized_len = max_serialized_len; + page_header_.__set_data_page_header(data_page_header_); + page_header_.uncompressed_page_size = uncompressed_size; + page_header_.compressed_page_size = compressed_size; + page_header_.type = parquet::PageType::DATA_PAGE; + + ASSERT_NO_THROW(SerializeThriftMsg(&page_header_, max_serialized_len, + out_stream_.get())); + } + + void ResetStream() { + out_buffer_.resize(0); + out_stream_.reset(new InMemoryOutputStream()); + } + + void EndStream() { + out_stream_->Transfer(&out_buffer_); + } + protected: + std::unique_ptr out_stream_; + + // TODO(wesm): Owns the results of the output stream. To be refactored + std::vector out_buffer_; + std::unique_ptr page_reader_; + parquet::PageHeader page_header_; + parquet::DataPageHeader data_page_header_; }; -TEST_F(TestSerializedPage, TestLargePageHeaders) { - parquet::PageHeader in_page_header; - parquet::DataPageHeader data_page_header; +void CheckDataPageHeader(const parquet::DataPageHeader expected, + const Page* page) { + ASSERT_EQ(PageType::DATA_PAGE, page->type()); + + const DataPage* data_page = static_cast(page); + ASSERT_EQ(expected.num_values, data_page->num_values()); + ASSERT_EQ(expected.encoding, data_page->encoding()); + ASSERT_EQ(expected.definition_level_encoding, + data_page->definition_level_encoding()); + ASSERT_EQ(expected.repetition_level_encoding, + data_page->repetition_level_encoding()); + + if (expected.statistics.__isset.max) { + ASSERT_EQ(0, memcmp(expected.statistics.max.c_str(), + data_page->max(), expected.statistics.max.length())); + } + if (expected.statistics.__isset.min) { + ASSERT_EQ(0, memcmp(expected.statistics.min.c_str(), + data_page->min(), expected.statistics.min.length())); + } +} + +TEST_F(TestPageSerde, DataPage) { parquet::PageHeader out_page_header; - parquet::Statistics stats; - int expected_header_size = 512 * 1024; //512 KB + + int stats_size = 512; + AddDummyStats(stats_size, data_page_header_); + data_page_header_.num_values = 4444; + + WriteDataPageHeader(); + InitSerializedPageReader(); + std::shared_ptr current_page = page_reader_->NextPage(); + CheckDataPageHeader(data_page_header_, current_page.get()); +} + +TEST_F(TestPageSerde, TestLargePageHeaders) { int stats_size = 256 * 1024; // 256 KB - std::string serialized_buffer; - int num_values = 4141; + AddDummyStats(stats_size, data_page_header_); - InitStats(stats_size, stats); - InitDataPage(stats, data_page_header, num_values); - InitPageHeader(data_page_header, in_page_header); + // Any number to verify metadata roundtrip + data_page_header_.num_values = 4141; - // Serialize the Page header - ASSERT_NO_THROW(serialized_buffer = SerializeThriftMsg(&in_page_header, - expected_header_size)); - // check header size is between 256 KB to 16 MB - ASSERT_LE(stats_size, serialized_buffer.length()); - ASSERT_GE(DEFAULT_MAX_PAGE_HEADER_SIZE, serialized_buffer.length()); + int max_header_size = 512 * 1024; // 512 KB + WriteDataPageHeader(max_header_size); + ASSERT_GE(max_header_size, out_stream_->Tell()); - InitSerializedPageReader(reinterpret_cast(serialized_buffer.c_str()), - serialized_buffer.length(), Compression::UNCOMPRESSED); + // check header size is between 256 KB to 16 MB + ASSERT_LE(stats_size, out_stream_->Tell()); + ASSERT_GE(DEFAULT_MAX_PAGE_HEADER_SIZE, out_stream_->Tell()); + InitSerializedPageReader(); std::shared_ptr current_page = page_reader_->NextPage(); - ASSERT_EQ(PageType::DATA_PAGE, current_page->type()); - const DataPage* page = static_cast(current_page.get()); - ASSERT_EQ(num_values, page->num_values()); + CheckDataPageHeader(data_page_header_, current_page.get()); } -TEST_F(TestSerializedPage, TestFailLargePageHeaders) { - parquet::PageHeader in_page_header; - parquet::DataPageHeader data_page_header; - parquet::PageHeader out_page_header; - parquet::Statistics stats; - int expected_header_size = 512 * 1024; // 512 KB +TEST_F(TestPageSerde, TestFailLargePageHeaders) { int stats_size = 256 * 1024; // 256 KB - int max_header_size = 128 * 1024; // 128 KB - int num_values = 4141; - std::string serialized_buffer; - - InitStats(stats_size, stats); - InitDataPage(stats, data_page_header, num_values); - InitPageHeader(data_page_header, in_page_header); + AddDummyStats(stats_size, data_page_header_); // Serialize the Page header - ASSERT_NO_THROW(serialized_buffer = SerializeThriftMsg(&in_page_header, - expected_header_size)); - // check header size is between 256 KB to 16 MB - ASSERT_LE(stats_size, serialized_buffer.length()); - ASSERT_GE(DEFAULT_MAX_PAGE_HEADER_SIZE, serialized_buffer.length()); + int max_header_size = 512 * 1024; // 512 KB + WriteDataPageHeader(max_header_size); + ASSERT_GE(max_header_size, out_stream_->Tell()); - InitSerializedPageReader(reinterpret_cast(serialized_buffer.c_str()), - serialized_buffer.length(), Compression::UNCOMPRESSED); - - // Set the max page header size to 128 KB, which is less than the current header size - page_reader_->set_max_page_header_size(max_header_size); + int smaller_max_size = 128 * 1024; + ASSERT_LE(smaller_max_size, out_stream_->Tell()); + InitSerializedPageReader(); + // Set the max page header size to 128 KB, which is less than the current + // header size + page_reader_->set_max_page_header_size(smaller_max_size); ASSERT_THROW(page_reader_->NextPage(), ParquetException); } + +TEST_F(TestPageSerde, Compression) { + Compression::type codec_types[2] = {Compression::GZIP, Compression::SNAPPY}; + + // This is a dummy number + data_page_header_.num_values = 32; + + int num_pages = 10; + + std::vector > faux_data; + faux_data.resize(num_pages); + for (int i = 0; i < num_pages; ++i) { + // The pages keep getting larger + int page_size = (i + 1) * 64; + test::random_bytes(page_size, 0, &faux_data[i]); + } + for (auto codec_type : codec_types) { + std::unique_ptr codec = Codec::Create(codec_type); + + std::vector buffer; + for (int i = 0; i < num_pages; ++i) { + const uint8_t* data = faux_data[i].data(); + size_t data_size = faux_data[i].size(); + + int64_t max_compressed_size = codec->MaxCompressedLen(data_size, data); + buffer.resize(max_compressed_size); + + int64_t actual_size = codec->Compress(data_size, data, + max_compressed_size, &buffer[0]); + + WriteDataPageHeader(1024, data_size, actual_size); + out_stream_->Write(buffer.data(), actual_size); + } + + InitSerializedPageReader(codec_type); + + std::shared_ptr page; + const DataPage* data_page; + for (int i = 0; i < num_pages; ++i) { + size_t data_size = faux_data[i].size(); + page = page_reader_->NextPage(); + data_page = static_cast(page.get()); + ASSERT_EQ(data_size, data_page->size()); + ASSERT_EQ(0, memcmp(faux_data[i].data(), data_page->data(), data_size)); + } + + ResetStream(); + } +} + +TEST_F(TestPageSerde, LZONotSupported) { + // Must await PARQUET-530 + int data_size = 1024; + std::vector faux_data(data_size); + WriteDataPageHeader(1024, data_size, data_size); + out_stream_->Write(faux_data.data(), data_size); + ASSERT_THROW(InitSerializedPageReader(Compression::LZO), ParquetException); +} + } // namespace parquet_cpp http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89198543/src/parquet/file/reader-internal.cc ---------------------------------------------------------------------- diff --git a/src/parquet/file/reader-internal.cc b/src/parquet/file/reader-internal.cc index 47092a5..0a93b00 100644 --- a/src/parquet/file/reader-internal.cc +++ b/src/parquet/file/reader-internal.cc @@ -21,6 +21,7 @@ #include #include #include +#include #include #include "parquet/column/page.h" @@ -40,22 +41,10 @@ namespace parquet_cpp { // assembled in a serialized stream for storing in a Parquet files SerializedPageReader::SerializedPageReader(std::unique_ptr stream, - Compression::type codec) : + Compression::type codec_type) : stream_(std::move(stream)) { max_page_header_size_ = DEFAULT_MAX_PAGE_HEADER_SIZE; - // TODO(wesm): add GZIP after PARQUET-456 - switch (codec) { - case Compression::UNCOMPRESSED: - break; - case Compression::SNAPPY: - decompressor_.reset(new SnappyCodec()); - break; - case Compression::LZO: - decompressor_.reset(new Lz4Codec()); - break; - default: - ParquetException::NYI("Reading compressed data"); - } + decompressor_ = Codec::Create(codec_type); } std::shared_ptr SerializedPageReader::NextPage() { @@ -126,11 +115,22 @@ std::shared_ptr SerializedPageReader::NextPage() { } else if (current_page_header_.type == parquet::PageType::DATA_PAGE) { const parquet::DataPageHeader& header = current_page_header_.data_page_header; - return std::make_shared(buffer, uncompressed_len, + auto page = std::make_shared(buffer, uncompressed_len, header.num_values, FromThrift(header.encoding), FromThrift(header.definition_level_encoding), FromThrift(header.repetition_level_encoding)); + + if (header.__isset.statistics) { + const parquet::Statistics stats = header.statistics; + if (stats.__isset.max) { + page->max_ = stats.max; + } + if (stats.__isset.min) { + page->min_ = stats.min; + } + } + return page; } else if (current_page_header_.type == parquet::PageType::DATA_PAGE_V2) { const parquet::DataPageHeaderV2& header = current_page_header_.data_page_header_v2; bool is_compressed = header.__isset.is_compressed? header.is_compressed : false; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89198543/src/parquet/schema/schema-descriptor-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/schema/schema-descriptor-test.cc b/src/parquet/schema/schema-descriptor-test.cc index c63df54..83b136d 100644 --- a/src/parquet/schema/schema-descriptor-test.cc +++ b/src/parquet/schema/schema-descriptor-test.cc @@ -27,6 +27,7 @@ #include "parquet/exception.h" #include "parquet/schema/descriptor.h" #include "parquet/schema/types.h" +#include "parquet/types.h" using std::string; using std::vector; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89198543/src/parquet/thrift/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/parquet/thrift/CMakeLists.txt b/src/parquet/thrift/CMakeLists.txt index 29b8ef8..f43c2a5 100644 --- a/src/parquet/thrift/CMakeLists.txt +++ b/src/parquet/thrift/CMakeLists.txt @@ -44,5 +44,3 @@ add_custom_command( COMMENT "Running thrift compiler on parquet.thrift" VERBATIM ) - -ADD_PARQUET_TEST(serializer-test) http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89198543/src/parquet/thrift/serializer-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/thrift/serializer-test.cc b/src/parquet/thrift/serializer-test.cc deleted file mode 100644 index 756fd10..0000000 --- a/src/parquet/thrift/serializer-test.cc +++ /dev/null @@ -1,75 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include - -#include -#include -#include - -#include "parquet/column/test-util.h" -#include "parquet/thrift/parquet_types.h" -#include "parquet/thrift/util.h" - -using std::string; - -namespace parquet_cpp { - -class TestThrift : public ::testing::Test { - -}; - -TEST_F(TestThrift, TestSerializerDeserializer) { - parquet::PageHeader in_page_header; - parquet::DataPageHeader data_page_header; - parquet::PageHeader out_page_header; - parquet::Statistics stats; - uint32_t max_header_len = 1024; - uint32_t expected_header_size = 1024; - uint32_t stats_size = 512; - std::string serialized_buffer; - int num_values = 4444; - - InitStats(stats_size, stats); - InitDataPage(stats, data_page_header, num_values); - InitPageHeader(data_page_header, in_page_header); - - // Serialize the Page header - ASSERT_NO_THROW(serialized_buffer = SerializeThriftMsg(&in_page_header, expected_header_size)); - ASSERT_LE(stats_size, serialized_buffer.length()); - ASSERT_GE(max_header_len, serialized_buffer.length()); - - uint32_t header_size = 1024; - // Deserialize the serialized page buffer - ASSERT_NO_THROW(DeserializeThriftMsg(reinterpret_cast(serialized_buffer.c_str()), - &header_size, &out_page_header)); - ASSERT_LE(stats_size, header_size); - ASSERT_GE(max_header_len, header_size); - - ASSERT_EQ(parquet::Encoding::PLAIN, out_page_header.data_page_header.encoding); - ASSERT_EQ(parquet::Encoding::RLE, out_page_header.data_page_header.definition_level_encoding); - ASSERT_EQ(parquet::Encoding::RLE, out_page_header.data_page_header.repetition_level_encoding); - for(int i = 0; i < stats_size; i++){ - EXPECT_EQ(i % 255, (reinterpret_cast - (out_page_header.data_page_header.statistics.max.c_str()))[i]); - } - ASSERT_EQ(parquet::PageType::DATA_PAGE, out_page_header.type); - ASSERT_EQ(num_values, out_page_header.data_page_header.num_values); - -} - -} // namespace parquet_cpp http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89198543/src/parquet/thrift/util.h ---------------------------------------------------------------------- diff --git a/src/parquet/thrift/util.h b/src/parquet/thrift/util.h index 8c34197..5f29820 100644 --- a/src/parquet/thrift/util.h +++ b/src/parquet/thrift/util.h @@ -18,8 +18,9 @@ #include #include "parquet/exception.h" -#include "parquet/util/logging.h" #include "parquet/thrift/parquet_types.h" +#include "parquet/util/logging.h" +#include "parquet/util/output.h" namespace parquet_cpp { @@ -77,7 +78,7 @@ inline void DeserializeThriftMsg(const uint8_t* buf, uint32_t* len, T* deseriali // The arguments are the object to be serialized and // the expected size of the serialized object template -inline std::string SerializeThriftMsg(T* obj, uint32_t len) { +inline void SerializeThriftMsg(T* obj, uint32_t len, OutputStream* out) { boost::shared_ptr mem_buffer( new apache::thrift::transport::TMemoryBuffer(len)); apache::thrift::protocol::TCompactProtocolFactoryT< @@ -92,7 +93,11 @@ inline std::string SerializeThriftMsg(T* obj, uint32_t len) { ss << "Couldn't serialize thrift: " << e.what() << "\n"; throw ParquetException(ss.str()); } - return mem_buffer->getBufferAsString(); + + uint8_t* out_buffer; + uint32_t out_length; + mem_buffer->getBuffer(&out_buffer, &out_length); + out->Write(out_buffer, out_length); } } // namespace parquet_cpp http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89198543/src/parquet/util/macros.h ---------------------------------------------------------------------- diff --git a/src/parquet/util/macros.h b/src/parquet/util/macros.h index 7b301d6..d221173 100644 --- a/src/parquet/util/macros.h +++ b/src/parquet/util/macros.h @@ -20,6 +20,11 @@ // Useful macros from elsewhere +// From Google gutil +#define DISALLOW_COPY_AND_ASSIGN(TypeName) \ + TypeName(const TypeName&) = delete; \ + void operator=(const TypeName&) = delete + // ---------------------------------------------------------------------- // From googletest http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89198543/src/parquet/util/output.h ---------------------------------------------------------------------- diff --git a/src/parquet/util/output.h b/src/parquet/util/output.h index be25abd..68a09e2 100644 --- a/src/parquet/util/output.h +++ b/src/parquet/util/output.h @@ -21,6 +21,8 @@ #include #include +#include "parquet/util/macros.h" + namespace parquet_cpp { // ---------------------------------------------------------------------- @@ -63,6 +65,8 @@ class InMemoryOutputStream : public OutputStream { std::vector buffer_; int64_t size_; int64_t capacity_; + + DISALLOW_COPY_AND_ASSIGN(InMemoryOutputStream); }; } // namespace parquet_cpp