Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id DB7E8200B91 for ; Thu, 15 Sep 2016 06:04:45 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id DA0D1160AD4; Thu, 15 Sep 2016 04:04:45 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 84440160AB4 for ; Thu, 15 Sep 2016 06:04:44 +0200 (CEST) Received: (qmail 35298 invoked by uid 500); 15 Sep 2016 04:04:43 -0000 Mailing-List: contact commits-help@parquet.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@parquet.apache.org Delivered-To: mailing list commits@parquet.apache.org Received: (qmail 35289 invoked by uid 99); 15 Sep 2016 04:04:43 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 15 Sep 2016 04:04:43 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 71622E008F; Thu, 15 Sep 2016 04:04:43 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wesm@apache.org To: commits@parquet.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: parquet-cpp git commit: PARQUET-687: C++: Switch to PLAIN encoding if dictionary grows too large Date: Thu, 15 Sep 2016 04:04:43 +0000 (UTC) archived-at: Thu, 15 Sep 2016 04:04:46 -0000 Repository: parquet-cpp Updated Branches: refs/heads/master 55604b297 -> c6f5ebe52 PARQUET-687: C++: Switch to PLAIN encoding if dictionary grows too large Implemented dictionary fallback encoding Added tests Added a fast path to serialize data pages Author: Deepak Majeti Closes #157 from majetideepak/PARQUET-717 and squashes the following commits: 6f51df6 [Deepak Majeti] minor comment fix c498aeb [Deepak Majeti] modify comment style eac9114 [Deepak Majeti] clang format da46033 [Deepak Majeti] added comments and fixed review suggestions 312bad8 [Deepak Majeti] minor changes dd0cc7e [Deepak Majeti] Add all types to the test 54af38a [Deepak Majeti] clang format 84f360d [Deepak Majeti] added dictionary fallback support with tests Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/c6f5ebe5 Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/c6f5ebe5 Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/c6f5ebe5 Branch: refs/heads/master Commit: c6f5ebe5207c541b691f75cad0eb32e517522201 Parents: 55604b2 Author: Deepak Majeti Authored: Thu Sep 15 00:04:35 2016 -0400 Committer: Wes McKinney Committed: Thu Sep 15 00:04:35 2016 -0400 ---------------------------------------------------------------------- src/parquet/column/column-writer-test.cc | 51 +++++++++++++++++++-- src/parquet/column/page.h | 5 ++- src/parquet/column/properties-test.cc | 2 +- src/parquet/column/properties.h | 41 +++++++++++------ src/parquet/column/writer.cc | 60 ++++++++++++++++++------- src/parquet/column/writer.h | 64 ++++++++++++++++++++++----- src/parquet/file/file-metadata-test.cc | 8 ++-- src/parquet/file/metadata.cc | 15 ++++--- src/parquet/file/metadata.h | 2 +- src/parquet/file/writer-internal.cc | 6 +-- src/parquet/file/writer-internal.h | 2 +- 11 files changed, 192 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6f5ebe5/src/parquet/column/column-writer-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/column-writer-test.cc b/src/parquet/column/column-writer-test.cc index b3ca080..a87dc48 100644 --- a/src/parquet/column/column-writer-test.cc +++ b/src/parquet/column/column-writer-test.cc @@ -39,6 +39,8 @@ namespace test { const int SMALL_SIZE = 100; // Larger size to test some corner cases, only used in some specific cases. const int LARGE_SIZE = 10000; +// Very large size to test dictionary fallback. +const int VERY_LARGE_SIZE = 400000; template class TestPrimitiveWriter : public ::testing::Test { @@ -74,10 +76,10 @@ class TestPrimitiveWriter : public ::testing::Test { repetition_levels_out_.resize(SMALL_SIZE); SetUpSchemaRequired(); - metadata_accessor_ = - ColumnChunkMetaData::Make(reinterpret_cast(&thrift_metadata_)); } + Type::type type_num() { return TestType::type_num; } + void BuildReader() { auto buffer = sink_->GetBuffer(); std::unique_ptr source(new InMemoryInputStream(buffer)); @@ -130,7 +132,23 @@ class TestPrimitiveWriter : public ::testing::Test { ASSERT_EQ(this->values_, this->values_out_); } - int64_t metadata_num_values() const { return metadata_accessor_->num_values(); } + int64_t metadata_num_values() { + // Metadata accessor must be created lazily. + // This is because the ColumnChunkMetaData semantics dictate the metadata object is + // complete (no changes to the metadata buffer can be made after instantiation) + auto metadata_accessor = + ColumnChunkMetaData::Make(reinterpret_cast(&thrift_metadata_)); + return metadata_accessor->num_values(); + } + + std::vector metadata_encodings() { + // Metadata accessor must be created lazily. + // This is because the ColumnChunkMetaData semantics dictate the metadata object is + // complete (no changes to the metadata buffer can be made after instantiation) + auto metadata_accessor = + ColumnChunkMetaData::Make(reinterpret_cast(&thrift_metadata_)); + return metadata_accessor->encodings(); + } protected: int64_t values_read_; @@ -156,7 +174,6 @@ class TestPrimitiveWriter : public ::testing::Test { NodePtr node_; format::ColumnChunk thrift_metadata_; std::unique_ptr metadata_; - std::unique_ptr metadata_accessor_; std::shared_ptr schema_; std::unique_ptr sink_; std::shared_ptr writer_properties_; @@ -334,5 +351,31 @@ TYPED_TEST(TestPrimitiveWriter, RequiredLargeChunk) { ASSERT_EQ(this->values_, this->values_out_); } +// Test case for dictionary fallback encoding +TYPED_TEST(TestPrimitiveWriter, RequiredVeryLargeChunk) { + this->GenerateData(VERY_LARGE_SIZE); + + auto writer = this->BuildWriter(VERY_LARGE_SIZE, Encoding::PLAIN_DICTIONARY); + writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_); + writer->Close(); + + // Just read the first SMALL_SIZE rows to ensure we could read it back in + this->ReadColumn(); + ASSERT_EQ(SMALL_SIZE, this->values_read_); + this->values_.resize(SMALL_SIZE); + ASSERT_EQ(this->values_, this->values_out_); + std::vector encodings = this->metadata_encodings(); + // There are 3 encodings (RLE, PLAIN_DICTIONARY, PLAIN) in a fallback case + // Dictionary encoding is not allowed for boolean type + // There are 2 encodings (RLE, PLAIN) in a non dictionary encoding case + ASSERT_EQ(Encoding::RLE, encodings[0]); + if (this->type_num() != Type::BOOLEAN) { + ASSERT_EQ(Encoding::PLAIN_DICTIONARY, encodings[1]); + ASSERT_EQ(Encoding::PLAIN, encodings[2]); + } else { + ASSERT_EQ(Encoding::PLAIN, encodings[1]); + } +} + } // namespace test } // namespace parquet http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6f5ebe5/src/parquet/column/page.h ---------------------------------------------------------------------- diff --git a/src/parquet/column/page.h b/src/parquet/column/page.h index 13dec2c..c06d3de 100644 --- a/src/parquet/column/page.h +++ b/src/parquet/column/page.h @@ -171,7 +171,10 @@ class PageWriter { public: virtual ~PageWriter() {} - virtual void Close() = 0; + // The Column Writer decides if dictionary encoding is used if set and + // if the dictionary encoding has fallen back to default encoding on reaching dictionary + // page limit + virtual void Close(bool has_dictionary, bool fallback) = 0; virtual int64_t WriteDataPage(const DataPage& page) = 0; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6f5ebe5/src/parquet/column/properties-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/properties-test.cc b/src/parquet/column/properties-test.cc index f1eeaf3..0d7314b 100644 --- a/src/parquet/column/properties-test.cc +++ b/src/parquet/column/properties-test.cc @@ -37,7 +37,7 @@ TEST(TestWriterProperties, Basics) { std::shared_ptr props = WriterProperties::Builder().build(); ASSERT_EQ(DEFAULT_PAGE_SIZE, props->data_pagesize()); - ASSERT_EQ(DEFAULT_DICTIONARY_PAGE_SIZE, props->dictionary_pagesize()); + ASSERT_EQ(DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT, props->dictionary_pagesize_limit()); ASSERT_EQ(DEFAULT_WRITER_VERSION, props->version()); } http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6f5ebe5/src/parquet/column/properties.h ---------------------------------------------------------------------- diff --git a/src/parquet/column/properties.h b/src/parquet/column/properties.h index c8f103b..91a4672 100644 --- a/src/parquet/column/properties.h +++ b/src/parquet/column/properties.h @@ -80,7 +80,8 @@ ReaderProperties PARQUET_EXPORT default_reader_properties(); static constexpr int64_t DEFAULT_PAGE_SIZE = 1024 * 1024; static constexpr bool DEFAULT_IS_DICTIONARY_ENABLED = true; -static constexpr int64_t DEFAULT_DICTIONARY_PAGE_SIZE = DEFAULT_PAGE_SIZE; +static constexpr int64_t DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT = DEFAULT_PAGE_SIZE; +static constexpr int64_t DEFAULT_WRITE_BATCH_SIZE = 1024; static constexpr Encoding::type DEFAULT_ENCODING = Encoding::PLAIN; static constexpr ParquetVersion::type DEFAULT_WRITER_VERSION = ParquetVersion::PARQUET_1_0; @@ -96,7 +97,8 @@ class PARQUET_EXPORT WriterProperties { Builder() : allocator_(default_allocator()), dictionary_enabled_default_(DEFAULT_IS_DICTIONARY_ENABLED), - dictionary_pagesize_(DEFAULT_DICTIONARY_PAGE_SIZE), + dictionary_pagesize_limit_(DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT), + write_batch_size_(DEFAULT_WRITE_BATCH_SIZE), pagesize_(DEFAULT_PAGE_SIZE), version_(DEFAULT_WRITER_VERSION), created_by_(DEFAULT_CREATED_BY), @@ -137,8 +139,13 @@ class PARQUET_EXPORT WriterProperties { return this->enable_dictionary(path->ToDotString()); } - Builder* dictionary_pagesize(int64_t dictionary_psize) { - dictionary_pagesize_ = dictionary_psize; + Builder* dictionary_pagesize_limit(int64_t dictionary_psize_limit) { + dictionary_pagesize_limit_ = dictionary_psize_limit; + return this; + } + + Builder* write_batch_size(int64_t write_batch_size) { + write_batch_size_ = write_batch_size; return this; } @@ -214,17 +221,18 @@ class PARQUET_EXPORT WriterProperties { } std::shared_ptr build() { - return std::shared_ptr( - new WriterProperties(allocator_, dictionary_enabled_default_, - dictionary_enabled_, dictionary_pagesize_, pagesize_, version_, created_by_, - default_encoding_, encodings_, default_codec_, codecs_)); + return std::shared_ptr(new WriterProperties(allocator_, + dictionary_enabled_default_, dictionary_enabled_, dictionary_pagesize_limit_, + write_batch_size_, pagesize_, version_, created_by_, default_encoding_, + encodings_, default_codec_, codecs_)); } private: MemoryAllocator* allocator_; bool dictionary_enabled_default_; std::unordered_map dictionary_enabled_; - int64_t dictionary_pagesize_; + int64_t dictionary_pagesize_limit_; + int64_t write_batch_size_; int64_t pagesize_; ParquetVersion::type version_; std::string created_by_; @@ -246,7 +254,9 @@ class PARQUET_EXPORT WriterProperties { return dictionary_enabled_default_; } - inline int64_t dictionary_pagesize() const { return dictionary_pagesize_; } + inline int64_t dictionary_pagesize_limit() const { return dictionary_pagesize_limit_; } + + inline int64_t write_batch_size() const { return write_batch_size_; } inline int64_t data_pagesize() const { return pagesize_; } @@ -286,14 +296,16 @@ class PARQUET_EXPORT WriterProperties { private: explicit WriterProperties(MemoryAllocator* allocator, bool dictionary_enabled_default, std::unordered_map dictionary_enabled, - int64_t dictionary_pagesize, int64_t pagesize, ParquetVersion::type version, - const std::string& created_by, Encoding::type default_encoding, + int64_t dictionary_pagesize, int64_t write_batch_size, int64_t pagesize, + ParquetVersion::type version, const std::string& created_by, + Encoding::type default_encoding, std::unordered_map encodings, Compression::type default_codec, const ColumnCodecs& codecs) : allocator_(allocator), dictionary_enabled_default_(dictionary_enabled_default), dictionary_enabled_(dictionary_enabled), - dictionary_pagesize_(dictionary_pagesize), + dictionary_pagesize_limit_(dictionary_pagesize), + write_batch_size_(write_batch_size), pagesize_(pagesize), parquet_version_(version), parquet_created_by_(created_by), @@ -304,7 +316,8 @@ class PARQUET_EXPORT WriterProperties { MemoryAllocator* allocator_; bool dictionary_enabled_default_; std::unordered_map dictionary_enabled_; - int64_t dictionary_pagesize_; + int64_t dictionary_pagesize_limit_; + int64_t write_batch_size_; int64_t pagesize_; ParquetVersion::type parquet_version_; std::string parquet_created_by_; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6f5ebe5/src/parquet/column/writer.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/writer.cc b/src/parquet/column/writer.cc index da4b17c..1fbea62 100644 --- a/src/parquet/column/writer.cc +++ b/src/parquet/column/writer.cc @@ -47,7 +47,8 @@ ColumnWriter::ColumnWriter(const ColumnDescriptor* descr, num_buffered_encoded_values_(0), num_rows_(0), total_bytes_written_(0), - closed_(false) { + closed_(false), + fallback_(false) { InitSinks(); } @@ -118,7 +119,13 @@ void ColumnWriter::AddDataPage() { DataPage page( uncompressed_data, num_buffered_values_, encoding_, Encoding::RLE, Encoding::RLE); - data_pages_.push_back(std::move(page)); + // Write the page to OutputStream eagerly if there is no dictionary or + // if dictionary encoding has fallen back to PLAIN + if (has_dictionary_ && !fallback_) { // Save pages until end of dictionary encoding + data_pages_.push_back(std::move(page)); + } else { // Eagerly write pages + WriteDataPage(page); + } // Re-initialize the sinks as GetBuffer made them invalid. InitSinks(); @@ -134,52 +141,71 @@ void ColumnWriter::WriteDataPage(const DataPage& page) { int64_t ColumnWriter::Close() { if (!closed_) { closed_ = true; - if (has_dictionary_) { WriteDictionaryPage(); } - // Write all outstanding data to a new page - if (num_buffered_values_ > 0) { AddDataPage(); } + if (has_dictionary_ && !fallback_) { WriteDictionaryPage(); } + + FlushBufferedDataPages(); - for (size_t i = 0; i < data_pages_.size(); i++) { - WriteDataPage(data_pages_[i]); - } + pager_->Close(has_dictionary_, fallback_); } if (num_rows_ != expected_rows_) { throw ParquetException( - "Less then the number of expected rows written in" + "Less than the number of expected rows written in" " the current column chunk"); } - pager_->Close(); - return total_bytes_written_; } +void ColumnWriter::FlushBufferedDataPages() { + // Write all outstanding data to a new page + if (num_buffered_values_ > 0) { AddDataPage(); } + for (size_t i = 0; i < data_pages_.size(); i++) { + WriteDataPage(data_pages_[i]); + } + data_pages_.clear(); +} + // ---------------------------------------------------------------------- // TypedColumnWriter template -TypedColumnWriter::TypedColumnWriter(const ColumnDescriptor* schema, +TypedColumnWriter::TypedColumnWriter(const ColumnDescriptor* descr, std::unique_ptr pager, int64_t expected_rows, Encoding::type encoding, const WriterProperties* properties) - : ColumnWriter(schema, std::move(pager), expected_rows, + : ColumnWriter(descr, std::move(pager), expected_rows, (encoding == Encoding::PLAIN_DICTIONARY || encoding == Encoding::RLE_DICTIONARY), encoding, properties) { switch (encoding) { case Encoding::PLAIN: - current_encoder_ = std::unique_ptr( - new PlainEncoder(schema, properties->allocator())); + current_encoder_.reset(new PlainEncoder(descr, properties->allocator())); break; case Encoding::PLAIN_DICTIONARY: case Encoding::RLE_DICTIONARY: - current_encoder_ = std::unique_ptr( - new DictEncoder(schema, &pool_, properties->allocator())); + current_encoder_.reset( + new DictEncoder(descr, &pool_, properties->allocator())); break; default: ParquetException::NYI("Selected encoding is not supported"); } } +// Only one Dictionary Page is written. +// Fallback to PLAIN if dictionary page limit is reached. +template +void TypedColumnWriter::CheckDictionarySizeLimit() { + auto dict_encoder = static_cast*>(current_encoder_.get()); + if (dict_encoder->dict_encoded_size() >= properties_->dictionary_pagesize_limit()) { + WriteDictionaryPage(); + // Serialize the buffered Dictionary Indicies + FlushBufferedDataPages(); + fallback_ = true; + // Only PLAIN encoding is supported for fallback in V1 + current_encoder_.reset(new PlainEncoder(descr_, properties_->allocator())); + } +} + template void TypedColumnWriter::WriteDictionaryPage() { auto dict_encoder = static_cast*>(current_encoder_.get()); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6f5ebe5/src/parquet/column/writer.h ---------------------------------------------------------------------- diff --git a/src/parquet/column/writer.h b/src/parquet/column/writer.h index c88ead1..6a6ee5f 100644 --- a/src/parquet/column/writer.h +++ b/src/parquet/column/writer.h @@ -33,6 +33,7 @@ namespace parquet { +static constexpr int WRITE_BATCH_SIZE = 1000; class PARQUET_EXPORT ColumnWriter { public: ColumnWriter(const ColumnDescriptor*, std::unique_ptr, @@ -56,9 +57,21 @@ class PARQUET_EXPORT ColumnWriter { protected: virtual std::shared_ptr GetValuesBuffer() = 0; + + // Serializes Dictionary Page if enabled virtual void WriteDictionaryPage() = 0; + // Checks if the Dictionary Page size limit is reached + // If the limit is reached, the Dictionary and Data Pages are serialized + // The encoding is switched to PLAIN + + virtual void CheckDictionarySizeLimit() = 0; + + // Adds Data Pages to an in memory buffer in dictionary encoding mode + // Serializes the Data Pages in other encoding modes void AddDataPage(); + + // Serializes Data Pages void WriteDataPage(const DataPage& page); // Write multiple definition levels @@ -70,6 +83,9 @@ class PARQUET_EXPORT ColumnWriter { std::shared_ptr RleEncodeLevels( const std::shared_ptr& buffer, int16_t max_level); + // Serialize the buffered Data Pages + void FlushBufferedDataPages(); + const ColumnDescriptor* descr_; std::unique_ptr pager_; @@ -100,16 +116,22 @@ class PARQUET_EXPORT ColumnWriter { // Total number of rows written with this ColumnWriter int num_rows_; + // Records the total number of bytes written by the serializer int total_bytes_written_; + + // Flag to check if the Writer has been closed bool closed_; + // Flag to infer if dictionary encoding has fallen back to PLAIN + bool fallback_; + std::unique_ptr definition_levels_sink_; std::unique_ptr repetition_levels_sink_; + std::vector data_pages_; + private: void InitSinks(); - - std::vector data_pages_; }; // API to write values to a single column. This is the main client facing API. @@ -131,26 +153,23 @@ class PARQUET_EXPORT TypedColumnWriter : public ColumnWriter { return current_encoder_->FlushValues(); } void WriteDictionaryPage() override; + void CheckDictionarySizeLimit() override; private: + void WriteMiniBatch(int64_t num_values, const int16_t* def_levels, + const int16_t* rep_levels, const T* values); + typedef Encoder EncoderType; // Write values to a temporary buffer before they are encoded into pages void WriteValues(int64_t num_values, const T* values); - - // Map of encoding type to the respective encoder object. For example, a - // column chunk's data pages may include both dictionary-encoded and - // plain-encoded data. - std::unordered_map> encoders_; - std::unique_ptr current_encoder_; }; template -inline void TypedColumnWriter::WriteBatch(int64_t num_values, +inline void TypedColumnWriter::WriteMiniBatch(int64_t num_values, const int16_t* def_levels, const int16_t* rep_levels, const T* values) { int64_t values_to_write = 0; - // If the field is required and non-repeated, there are no definition levels if (descr_->max_definition_level() > 0) { for (int64_t i = 0; i < num_values; ++i) { @@ -178,7 +197,7 @@ inline void TypedColumnWriter::WriteBatch(int64_t num_values, } if (num_rows_ > expected_rows_) { - throw ParquetException("More rows were written in the column chunk then expected"); + throw ParquetException("More rows were written in the column chunk than expected"); } WriteValues(values_to_write, values); @@ -189,6 +208,29 @@ inline void TypedColumnWriter::WriteBatch(int64_t num_values, if (current_encoder_->EstimatedDataEncodedSize() >= properties_->data_pagesize()) { AddDataPage(); } + if (has_dictionary_ && !fallback_) { CheckDictionarySizeLimit(); } +} + +template +inline void TypedColumnWriter::WriteBatch(int64_t num_values, + const int16_t* def_levels, const int16_t* rep_levels, const T* values) { + // We check for DataPage limits only after we have inserted the values. If a user + // writes a large number of values, the DataPage size can be much above the limit. + // The purpose of this chunking is to bound this. Even if a user writes large number + // of values, the chunking will ensure the AddDataPage() is called at a reasonable + // pagesize limit + int64_t write_batch_size = properties_->write_batch_size(); + int num_batches = num_values / write_batch_size; + int64_t num_remaining = num_values % write_batch_size; + for (int round = 0; round < num_batches; round++) { + int64_t offset = round * write_batch_size; + WriteMiniBatch( + write_batch_size, &def_levels[offset], &rep_levels[offset], &values[offset]); + } + // Write the remaining values + int64_t offset = num_batches * write_batch_size; + WriteMiniBatch( + num_remaining, &def_levels[offset], &rep_levels[offset], &values[offset]); } template http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6f5ebe5/src/parquet/file/file-metadata-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/file/file-metadata-test.cc b/src/parquet/file/file-metadata-test.cc index 852072c..a7f83c5 100644 --- a/src/parquet/file/file-metadata-test.cc +++ b/src/parquet/file/file-metadata-test.cc @@ -64,8 +64,8 @@ TEST(Metadata, TestBuildAccess) { // column metadata col1_builder->SetStatistics(stats_int); col2_builder->SetStatistics(stats_float); - col1_builder->Finish(nrows / 2, 4, 0, 10, 512, 600, false); - col2_builder->Finish(nrows / 2, 24, 0, 30, 512, 600, false); + col1_builder->Finish(nrows / 2, 4, 0, 10, 512, 600, true, false); + col2_builder->Finish(nrows / 2, 24, 0, 30, 512, 600, true, false); rg1_builder->Finish(1024); // rowgroup2 metadata @@ -74,8 +74,8 @@ TEST(Metadata, TestBuildAccess) { // column metadata col1_builder->SetStatistics(stats_int); col2_builder->SetStatistics(stats_float); - col1_builder->Finish(nrows / 2, 6, 0, 10, 512, 600, false); - col2_builder->Finish(nrows / 2, 16, 0, 26, 512, 600, false); + col1_builder->Finish(nrows / 2, 6, 0, 10, 512, 600, true, false); + col2_builder->Finish(nrows / 2, 16, 0, 26, 512, 600, true, false); rg2_builder->Finish(1024); // Read the metadata http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6f5ebe5/src/parquet/file/metadata.cc ---------------------------------------------------------------------- diff --git a/src/parquet/file/metadata.cc b/src/parquet/file/metadata.cc index bc0f7b9..00ce990 100644 --- a/src/parquet/file/metadata.cc +++ b/src/parquet/file/metadata.cc @@ -353,7 +353,7 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl { void Finish(int64_t num_values, int64_t dictionary_page_offset, int64_t index_page_offset, int64_t data_page_offset, int64_t compressed_size, - int64_t uncompressed_size, bool dictionary_fallback = false) { + int64_t uncompressed_size, bool has_dictionary, bool dictionary_fallback) { if (dictionary_page_offset > 0) { column_chunk_->__set_file_offset(dictionary_page_offset + compressed_size); } else { @@ -368,16 +368,18 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl { column_chunk_->meta_data.__set_total_compressed_size(compressed_size); std::vector thrift_encodings; thrift_encodings.push_back(ToThrift(Encoding::RLE)); - if (properties_->dictionary_enabled(column_->path())) { + if (has_dictionary) { thrift_encodings.push_back(ToThrift(properties_->dictionary_page_encoding())); // add the encoding only if it is unique if (properties_->version() == ParquetVersion::PARQUET_2_0) { thrift_encodings.push_back(ToThrift(properties_->dictionary_index_encoding())); } - } - if (!properties_->dictionary_enabled(column_->path()) || dictionary_fallback) { + } else { // Dictionary not enabled thrift_encodings.push_back(ToThrift(properties_->encoding(column_->path()))); } + // Only PLAIN encoding is supported for fallback in V1 + // TODO(majetideepak): Use user specified encoding for V2 + if (dictionary_fallback) { thrift_encodings.push_back(ToThrift(Encoding::PLAIN)); } column_chunk_->meta_data.__set_encodings(thrift_encodings); } @@ -410,9 +412,10 @@ void ColumnChunkMetaDataBuilder::set_file_path(const std::string& path) { void ColumnChunkMetaDataBuilder::Finish(int64_t num_values, int64_t dictionary_page_offset, int64_t index_page_offset, int64_t data_page_offset, - int64_t compressed_size, int64_t uncompressed_size, bool dictionary_fallback) { + int64_t compressed_size, int64_t uncompressed_size, bool has_dictionary, + bool dictionary_fallback) { impl_->Finish(num_values, dictionary_page_offset, index_page_offset, data_page_offset, - compressed_size, uncompressed_size, dictionary_fallback); + compressed_size, uncompressed_size, has_dictionary, dictionary_fallback); } const ColumnDescriptor* ColumnChunkMetaDataBuilder::descr() const { http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6f5ebe5/src/parquet/file/metadata.h ---------------------------------------------------------------------- diff --git a/src/parquet/file/metadata.h b/src/parquet/file/metadata.h index 1d96621..0ef6fa0 100644 --- a/src/parquet/file/metadata.h +++ b/src/parquet/file/metadata.h @@ -149,7 +149,7 @@ class PARQUET_EXPORT ColumnChunkMetaDataBuilder { // commit the metadata void Finish(int64_t num_values, int64_t dictonary_page_offset, int64_t index_page_offset, int64_t data_page_offset, int64_t compressed_size, - int64_t uncompressed_size, bool dictionary_fallback); + int64_t uncompressed_size, bool has_dictionary, bool dictionary_fallback); private: explicit ColumnChunkMetaDataBuilder(const std::shared_ptr& props, http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6f5ebe5/src/parquet/file/writer-internal.cc ---------------------------------------------------------------------- diff --git a/src/parquet/file/writer-internal.cc b/src/parquet/file/writer-internal.cc index fb05f13..2d396b7 100644 --- a/src/parquet/file/writer-internal.cc +++ b/src/parquet/file/writer-internal.cc @@ -46,11 +46,9 @@ SerializedPageWriter::SerializedPageWriter(OutputStream* sink, Compression::type compressor_ = Codec::Create(codec); } -void SerializedPageWriter::Close() { - // index_page_offset = 0 since they are not supported - // TODO: Remove default fallback = 'false' when implemented +void SerializedPageWriter::Close(bool has_dictionary, bool fallback) { metadata_->Finish(num_values_, dictionary_page_offset_, 0, data_page_offset_, - total_compressed_size_, total_uncompressed_size_, false); + total_compressed_size_, total_uncompressed_size_, has_dictionary, fallback); } std::shared_ptr SerializedPageWriter::Compress( http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6f5ebe5/src/parquet/file/writer-internal.h ---------------------------------------------------------------------- diff --git a/src/parquet/file/writer-internal.h b/src/parquet/file/writer-internal.h index 645d4bf..e6364e9 100644 --- a/src/parquet/file/writer-internal.h +++ b/src/parquet/file/writer-internal.h @@ -44,7 +44,7 @@ class SerializedPageWriter : public PageWriter { int64_t WriteDictionaryPage(const DictionaryPage& page) override; - void Close() override; + void Close(bool has_dictionary, bool fallback) override; private: OutputStream* sink_;