parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject parquet-cpp git commit: PARQUET-687: C++: Switch to PLAIN encoding if dictionary grows too large
Date Thu, 15 Sep 2016 04:04:43 GMT
Repository: parquet-cpp
Updated Branches:
  refs/heads/master 55604b297 -> c6f5ebe52


PARQUET-687: C++: Switch to PLAIN encoding if dictionary grows too large

Implemented dictionary fallback encoding
Added tests
Added a fast path to serialize data pages

Author: Deepak Majeti <deepak.majeti@hpe.com>

Closes #157 from majetideepak/PARQUET-717 and squashes the following commits:

6f51df6 [Deepak Majeti] minor comment fix
c498aeb [Deepak Majeti] modify comment style
eac9114 [Deepak Majeti] clang format
da46033 [Deepak Majeti] added comments and fixed review suggestions
312bad8 [Deepak Majeti] minor changes
dd0cc7e [Deepak Majeti] Add all types to the test
54af38a [Deepak Majeti] clang format
84f360d [Deepak Majeti] added dictionary fallback support with tests


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/c6f5ebe5
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/c6f5ebe5
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/c6f5ebe5

Branch: refs/heads/master
Commit: c6f5ebe5207c541b691f75cad0eb32e517522201
Parents: 55604b2
Author: Deepak Majeti <deepak.majeti@hpe.com>
Authored: Thu Sep 15 00:04:35 2016 -0400
Committer: Wes McKinney <wesm@apache.org>
Committed: Thu Sep 15 00:04:35 2016 -0400

----------------------------------------------------------------------
 src/parquet/column/column-writer-test.cc | 51 +++++++++++++++++++--
 src/parquet/column/page.h                |  5 ++-
 src/parquet/column/properties-test.cc    |  2 +-
 src/parquet/column/properties.h          | 41 +++++++++++------
 src/parquet/column/writer.cc             | 60 ++++++++++++++++++-------
 src/parquet/column/writer.h              | 64 ++++++++++++++++++++++-----
 src/parquet/file/file-metadata-test.cc   |  8 ++--
 src/parquet/file/metadata.cc             | 15 ++++---
 src/parquet/file/metadata.h              |  2 +-
 src/parquet/file/writer-internal.cc      |  6 +--
 src/parquet/file/writer-internal.h       |  2 +-
 11 files changed, 192 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6f5ebe5/src/parquet/column/column-writer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/column-writer-test.cc b/src/parquet/column/column-writer-test.cc
index b3ca080..a87dc48 100644
--- a/src/parquet/column/column-writer-test.cc
+++ b/src/parquet/column/column-writer-test.cc
@@ -39,6 +39,8 @@ namespace test {
 const int SMALL_SIZE = 100;
 // Larger size to test some corner cases, only used in some specific cases.
 const int LARGE_SIZE = 10000;
+// Very large size to test dictionary fallback.
+const int VERY_LARGE_SIZE = 400000;
 
 template <typename TestType>
 class TestPrimitiveWriter : public ::testing::Test {
@@ -74,10 +76,10 @@ class TestPrimitiveWriter : public ::testing::Test {
     repetition_levels_out_.resize(SMALL_SIZE);
 
     SetUpSchemaRequired();
-    metadata_accessor_ =
-        ColumnChunkMetaData::Make(reinterpret_cast<uint8_t*>(&thrift_metadata_));
   }
 
+  Type::type type_num() { return TestType::type_num; }
+
   void BuildReader() {
     auto buffer = sink_->GetBuffer();
     std::unique_ptr<InMemoryInputStream> source(new InMemoryInputStream(buffer));
@@ -130,7 +132,23 @@ class TestPrimitiveWriter : public ::testing::Test {
     ASSERT_EQ(this->values_, this->values_out_);
   }
 
-  int64_t metadata_num_values() const { return metadata_accessor_->num_values(); }
+  int64_t metadata_num_values() {
+    // Metadata accessor must be created lazily.
+    // This is because the ColumnChunkMetaData semantics dictate the metadata object is
+    // complete (no changes to the metadata buffer can be made after instantiation)
+    auto metadata_accessor =
+        ColumnChunkMetaData::Make(reinterpret_cast<const uint8_t*>(&thrift_metadata_));
+    return metadata_accessor->num_values();
+  }
+
+  std::vector<Encoding::type> metadata_encodings() {
+    // Metadata accessor must be created lazily.
+    // This is because the ColumnChunkMetaData semantics dictate the metadata object is
+    // complete (no changes to the metadata buffer can be made after instantiation)
+    auto metadata_accessor =
+        ColumnChunkMetaData::Make(reinterpret_cast<const uint8_t*>(&thrift_metadata_));
+    return metadata_accessor->encodings();
+  }
 
  protected:
   int64_t values_read_;
@@ -156,7 +174,6 @@ class TestPrimitiveWriter : public ::testing::Test {
   NodePtr node_;
   format::ColumnChunk thrift_metadata_;
   std::unique_ptr<ColumnChunkMetaDataBuilder> metadata_;
-  std::unique_ptr<ColumnChunkMetaData> metadata_accessor_;
   std::shared_ptr<ColumnDescriptor> schema_;
   std::unique_ptr<InMemoryOutputStream> sink_;
   std::shared_ptr<WriterProperties> writer_properties_;
@@ -334,5 +351,31 @@ TYPED_TEST(TestPrimitiveWriter, RequiredLargeChunk) {
   ASSERT_EQ(this->values_, this->values_out_);
 }
 
+// Test case for dictionary fallback encoding
+TYPED_TEST(TestPrimitiveWriter, RequiredVeryLargeChunk) {
+  this->GenerateData(VERY_LARGE_SIZE);
+
+  auto writer = this->BuildWriter(VERY_LARGE_SIZE, Encoding::PLAIN_DICTIONARY);
+  writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_);
+  writer->Close();
+
+  // Just read the first SMALL_SIZE rows to ensure we could read it back in
+  this->ReadColumn();
+  ASSERT_EQ(SMALL_SIZE, this->values_read_);
+  this->values_.resize(SMALL_SIZE);
+  ASSERT_EQ(this->values_, this->values_out_);
+  std::vector<Encoding::type> encodings = this->metadata_encodings();
+  // There are 3 encodings (RLE, PLAIN_DICTIONARY, PLAIN) in a fallback case
+  // Dictionary encoding is not allowed for boolean type
+  // There are 2 encodings (RLE, PLAIN) in a non dictionary encoding case
+  ASSERT_EQ(Encoding::RLE, encodings[0]);
+  if (this->type_num() != Type::BOOLEAN) {
+    ASSERT_EQ(Encoding::PLAIN_DICTIONARY, encodings[1]);
+    ASSERT_EQ(Encoding::PLAIN, encodings[2]);
+  } else {
+    ASSERT_EQ(Encoding::PLAIN, encodings[1]);
+  }
+}
+
 }  // namespace test
 }  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6f5ebe5/src/parquet/column/page.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/page.h b/src/parquet/column/page.h
index 13dec2c..c06d3de 100644
--- a/src/parquet/column/page.h
+++ b/src/parquet/column/page.h
@@ -171,7 +171,10 @@ class PageWriter {
  public:
   virtual ~PageWriter() {}
 
-  virtual void Close() = 0;
+  // The Column Writer decides if dictionary encoding is used if set and
+  // if the dictionary encoding has fallen back to default encoding on reaching dictionary
+  // page limit
+  virtual void Close(bool has_dictionary, bool fallback) = 0;
 
   virtual int64_t WriteDataPage(const DataPage& page) = 0;
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6f5ebe5/src/parquet/column/properties-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/properties-test.cc b/src/parquet/column/properties-test.cc
index f1eeaf3..0d7314b 100644
--- a/src/parquet/column/properties-test.cc
+++ b/src/parquet/column/properties-test.cc
@@ -37,7 +37,7 @@ TEST(TestWriterProperties, Basics) {
   std::shared_ptr<WriterProperties> props = WriterProperties::Builder().build();
 
   ASSERT_EQ(DEFAULT_PAGE_SIZE, props->data_pagesize());
-  ASSERT_EQ(DEFAULT_DICTIONARY_PAGE_SIZE, props->dictionary_pagesize());
+  ASSERT_EQ(DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT, props->dictionary_pagesize_limit());
   ASSERT_EQ(DEFAULT_WRITER_VERSION, props->version());
 }
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6f5ebe5/src/parquet/column/properties.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/properties.h b/src/parquet/column/properties.h
index c8f103b..91a4672 100644
--- a/src/parquet/column/properties.h
+++ b/src/parquet/column/properties.h
@@ -80,7 +80,8 @@ ReaderProperties PARQUET_EXPORT default_reader_properties();
 
 static constexpr int64_t DEFAULT_PAGE_SIZE = 1024 * 1024;
 static constexpr bool DEFAULT_IS_DICTIONARY_ENABLED = true;
-static constexpr int64_t DEFAULT_DICTIONARY_PAGE_SIZE = DEFAULT_PAGE_SIZE;
+static constexpr int64_t DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT = DEFAULT_PAGE_SIZE;
+static constexpr int64_t DEFAULT_WRITE_BATCH_SIZE = 1024;
 static constexpr Encoding::type DEFAULT_ENCODING = Encoding::PLAIN;
 static constexpr ParquetVersion::type DEFAULT_WRITER_VERSION =
     ParquetVersion::PARQUET_1_0;
@@ -96,7 +97,8 @@ class PARQUET_EXPORT WriterProperties {
     Builder()
         : allocator_(default_allocator()),
           dictionary_enabled_default_(DEFAULT_IS_DICTIONARY_ENABLED),
-          dictionary_pagesize_(DEFAULT_DICTIONARY_PAGE_SIZE),
+          dictionary_pagesize_limit_(DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT),
+          write_batch_size_(DEFAULT_WRITE_BATCH_SIZE),
           pagesize_(DEFAULT_PAGE_SIZE),
           version_(DEFAULT_WRITER_VERSION),
           created_by_(DEFAULT_CREATED_BY),
@@ -137,8 +139,13 @@ class PARQUET_EXPORT WriterProperties {
       return this->enable_dictionary(path->ToDotString());
     }
 
-    Builder* dictionary_pagesize(int64_t dictionary_psize) {
-      dictionary_pagesize_ = dictionary_psize;
+    Builder* dictionary_pagesize_limit(int64_t dictionary_psize_limit) {
+      dictionary_pagesize_limit_ = dictionary_psize_limit;
+      return this;
+    }
+
+    Builder* write_batch_size(int64_t write_batch_size) {
+      write_batch_size_ = write_batch_size;
       return this;
     }
 
@@ -214,17 +221,18 @@ class PARQUET_EXPORT WriterProperties {
     }
 
     std::shared_ptr<WriterProperties> build() {
-      return std::shared_ptr<WriterProperties>(
-          new WriterProperties(allocator_, dictionary_enabled_default_,
-              dictionary_enabled_, dictionary_pagesize_, pagesize_, version_, created_by_,
-              default_encoding_, encodings_, default_codec_, codecs_));
+      return std::shared_ptr<WriterProperties>(new WriterProperties(allocator_,
+          dictionary_enabled_default_, dictionary_enabled_, dictionary_pagesize_limit_,
+          write_batch_size_, pagesize_, version_, created_by_, default_encoding_,
+          encodings_, default_codec_, codecs_));
     }
 
    private:
     MemoryAllocator* allocator_;
     bool dictionary_enabled_default_;
     std::unordered_map<std::string, bool> dictionary_enabled_;
-    int64_t dictionary_pagesize_;
+    int64_t dictionary_pagesize_limit_;
+    int64_t write_batch_size_;
     int64_t pagesize_;
     ParquetVersion::type version_;
     std::string created_by_;
@@ -246,7 +254,9 @@ class PARQUET_EXPORT WriterProperties {
     return dictionary_enabled_default_;
   }
 
-  inline int64_t dictionary_pagesize() const { return dictionary_pagesize_; }
+  inline int64_t dictionary_pagesize_limit() const { return dictionary_pagesize_limit_; }
+
+  inline int64_t write_batch_size() const { return write_batch_size_; }
 
   inline int64_t data_pagesize() const { return pagesize_; }
 
@@ -286,14 +296,16 @@ class PARQUET_EXPORT WriterProperties {
  private:
   explicit WriterProperties(MemoryAllocator* allocator, bool dictionary_enabled_default,
       std::unordered_map<std::string, bool> dictionary_enabled,
-      int64_t dictionary_pagesize, int64_t pagesize, ParquetVersion::type version,
-      const std::string& created_by, Encoding::type default_encoding,
+      int64_t dictionary_pagesize, int64_t write_batch_size, int64_t pagesize,
+      ParquetVersion::type version, const std::string& created_by,
+      Encoding::type default_encoding,
       std::unordered_map<std::string, Encoding::type> encodings,
       Compression::type default_codec, const ColumnCodecs& codecs)
       : allocator_(allocator),
         dictionary_enabled_default_(dictionary_enabled_default),
         dictionary_enabled_(dictionary_enabled),
-        dictionary_pagesize_(dictionary_pagesize),
+        dictionary_pagesize_limit_(dictionary_pagesize),
+        write_batch_size_(write_batch_size),
         pagesize_(pagesize),
         parquet_version_(version),
         parquet_created_by_(created_by),
@@ -304,7 +316,8 @@ class PARQUET_EXPORT WriterProperties {
   MemoryAllocator* allocator_;
   bool dictionary_enabled_default_;
   std::unordered_map<std::string, bool> dictionary_enabled_;
-  int64_t dictionary_pagesize_;
+  int64_t dictionary_pagesize_limit_;
+  int64_t write_batch_size_;
   int64_t pagesize_;
   ParquetVersion::type parquet_version_;
   std::string parquet_created_by_;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6f5ebe5/src/parquet/column/writer.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/writer.cc b/src/parquet/column/writer.cc
index da4b17c..1fbea62 100644
--- a/src/parquet/column/writer.cc
+++ b/src/parquet/column/writer.cc
@@ -47,7 +47,8 @@ ColumnWriter::ColumnWriter(const ColumnDescriptor* descr,
       num_buffered_encoded_values_(0),
       num_rows_(0),
       total_bytes_written_(0),
-      closed_(false) {
+      closed_(false),
+      fallback_(false) {
   InitSinks();
 }
 
@@ -118,7 +119,13 @@ void ColumnWriter::AddDataPage() {
   DataPage page(
       uncompressed_data, num_buffered_values_, encoding_, Encoding::RLE, Encoding::RLE);
 
-  data_pages_.push_back(std::move(page));
+  // Write the page to OutputStream eagerly if there is no dictionary or
+  // if dictionary encoding has fallen back to PLAIN
+  if (has_dictionary_ && !fallback_) {  // Save pages until end of dictionary encoding
+    data_pages_.push_back(std::move(page));
+  } else {  // Eagerly write pages
+    WriteDataPage(page);
+  }
 
   // Re-initialize the sinks as GetBuffer made them invalid.
   InitSinks();
@@ -134,52 +141,71 @@ void ColumnWriter::WriteDataPage(const DataPage& page) {
 int64_t ColumnWriter::Close() {
   if (!closed_) {
     closed_ = true;
-    if (has_dictionary_) { WriteDictionaryPage(); }
-    // Write all outstanding data to a new page
-    if (num_buffered_values_ > 0) { AddDataPage(); }
+    if (has_dictionary_ && !fallback_) { WriteDictionaryPage(); }
+
+    FlushBufferedDataPages();
 
-    for (size_t i = 0; i < data_pages_.size(); i++) {
-      WriteDataPage(data_pages_[i]);
-    }
+    pager_->Close(has_dictionary_, fallback_);
   }
 
   if (num_rows_ != expected_rows_) {
     throw ParquetException(
-        "Less then the number of expected rows written in"
+        "Less than the number of expected rows written in"
         " the current column chunk");
   }
 
-  pager_->Close();
-
   return total_bytes_written_;
 }
 
+void ColumnWriter::FlushBufferedDataPages() {
+  // Write all outstanding data to a new page
+  if (num_buffered_values_ > 0) { AddDataPage(); }
+  for (size_t i = 0; i < data_pages_.size(); i++) {
+    WriteDataPage(data_pages_[i]);
+  }
+  data_pages_.clear();
+}
+
 // ----------------------------------------------------------------------
 // TypedColumnWriter
 
 template <typename Type>
-TypedColumnWriter<Type>::TypedColumnWriter(const ColumnDescriptor* schema,
+TypedColumnWriter<Type>::TypedColumnWriter(const ColumnDescriptor* descr,
     std::unique_ptr<PageWriter> pager, int64_t expected_rows, Encoding::type encoding,
     const WriterProperties* properties)
-    : ColumnWriter(schema, std::move(pager), expected_rows,
+    : ColumnWriter(descr, std::move(pager), expected_rows,
           (encoding == Encoding::PLAIN_DICTIONARY ||
                        encoding == Encoding::RLE_DICTIONARY),
           encoding, properties) {
   switch (encoding) {
     case Encoding::PLAIN:
-      current_encoder_ = std::unique_ptr<EncoderType>(
-          new PlainEncoder<Type>(schema, properties->allocator()));
+      current_encoder_.reset(new PlainEncoder<Type>(descr, properties->allocator()));
       break;
     case Encoding::PLAIN_DICTIONARY:
     case Encoding::RLE_DICTIONARY:
-      current_encoder_ = std::unique_ptr<EncoderType>(
-          new DictEncoder<Type>(schema, &pool_, properties->allocator()));
+      current_encoder_.reset(
+          new DictEncoder<Type>(descr, &pool_, properties->allocator()));
       break;
     default:
       ParquetException::NYI("Selected encoding is not supported");
   }
 }
 
+// Only one Dictionary Page is written.
+// Fallback to PLAIN if dictionary page limit is reached.
+template <typename Type>
+void TypedColumnWriter<Type>::CheckDictionarySizeLimit() {
+  auto dict_encoder = static_cast<DictEncoder<Type>*>(current_encoder_.get());
+  if (dict_encoder->dict_encoded_size() >= properties_->dictionary_pagesize_limit())
{
+    WriteDictionaryPage();
+    // Serialize the buffered Dictionary Indicies
+    FlushBufferedDataPages();
+    fallback_ = true;
+    // Only PLAIN encoding is supported for fallback in V1
+    current_encoder_.reset(new PlainEncoder<Type>(descr_, properties_->allocator()));
+  }
+}
+
 template <typename Type>
 void TypedColumnWriter<Type>::WriteDictionaryPage() {
   auto dict_encoder = static_cast<DictEncoder<Type>*>(current_encoder_.get());

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6f5ebe5/src/parquet/column/writer.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/writer.h b/src/parquet/column/writer.h
index c88ead1..6a6ee5f 100644
--- a/src/parquet/column/writer.h
+++ b/src/parquet/column/writer.h
@@ -33,6 +33,7 @@
 
 namespace parquet {
 
+static constexpr int WRITE_BATCH_SIZE = 1000;
 class PARQUET_EXPORT ColumnWriter {
  public:
   ColumnWriter(const ColumnDescriptor*, std::unique_ptr<PageWriter>,
@@ -56,9 +57,21 @@ class PARQUET_EXPORT ColumnWriter {
 
  protected:
   virtual std::shared_ptr<Buffer> GetValuesBuffer() = 0;
+
+  // Serializes Dictionary Page if enabled
   virtual void WriteDictionaryPage() = 0;
 
+  // Checks if the Dictionary Page size limit is reached
+  // If the limit is reached, the Dictionary and Data Pages are serialized
+  // The encoding is switched to PLAIN
+
+  virtual void CheckDictionarySizeLimit() = 0;
+
+  // Adds Data Pages to an in memory buffer in dictionary encoding mode
+  // Serializes the Data Pages in other encoding modes
   void AddDataPage();
+
+  // Serializes Data Pages
   void WriteDataPage(const DataPage& page);
 
   // Write multiple definition levels
@@ -70,6 +83,9 @@ class PARQUET_EXPORT ColumnWriter {
   std::shared_ptr<Buffer> RleEncodeLevels(
       const std::shared_ptr<Buffer>& buffer, int16_t max_level);
 
+  // Serialize the buffered Data Pages
+  void FlushBufferedDataPages();
+
   const ColumnDescriptor* descr_;
 
   std::unique_ptr<PageWriter> pager_;
@@ -100,16 +116,22 @@ class PARQUET_EXPORT ColumnWriter {
   // Total number of rows written with this ColumnWriter
   int num_rows_;
 
+  // Records the total number of bytes written by the serializer
   int total_bytes_written_;
+
+  // Flag to check if the Writer has been closed
   bool closed_;
 
+  // Flag to infer if dictionary encoding has fallen back to PLAIN
+  bool fallback_;
+
   std::unique_ptr<InMemoryOutputStream> definition_levels_sink_;
   std::unique_ptr<InMemoryOutputStream> repetition_levels_sink_;
 
+  std::vector<DataPage> data_pages_;
+
  private:
   void InitSinks();
-
-  std::vector<DataPage> data_pages_;
 };
 
 // API to write values to a single column. This is the main client facing API.
@@ -131,26 +153,23 @@ class PARQUET_EXPORT TypedColumnWriter : public ColumnWriter {
     return current_encoder_->FlushValues();
   }
   void WriteDictionaryPage() override;
+  void CheckDictionarySizeLimit() override;
 
  private:
+  void WriteMiniBatch(int64_t num_values, const int16_t* def_levels,
+      const int16_t* rep_levels, const T* values);
+
   typedef Encoder<DType> EncoderType;
 
   // Write values to a temporary buffer before they are encoded into pages
   void WriteValues(int64_t num_values, const T* values);
-
-  // Map of encoding type to the respective encoder object. For example, a
-  // column chunk's data pages may include both dictionary-encoded and
-  // plain-encoded data.
-  std::unordered_map<int, std::shared_ptr<EncoderType>> encoders_;
-
   std::unique_ptr<EncoderType> current_encoder_;
 };
 
 template <typename DType>
-inline void TypedColumnWriter<DType>::WriteBatch(int64_t num_values,
+inline void TypedColumnWriter<DType>::WriteMiniBatch(int64_t num_values,
     const int16_t* def_levels, const int16_t* rep_levels, const T* values) {
   int64_t values_to_write = 0;
-
   // If the field is required and non-repeated, there are no definition levels
   if (descr_->max_definition_level() > 0) {
     for (int64_t i = 0; i < num_values; ++i) {
@@ -178,7 +197,7 @@ inline void TypedColumnWriter<DType>::WriteBatch(int64_t num_values,
   }
 
   if (num_rows_ > expected_rows_) {
-    throw ParquetException("More rows were written in the column chunk then expected");
+    throw ParquetException("More rows were written in the column chunk than expected");
   }
 
   WriteValues(values_to_write, values);
@@ -189,6 +208,29 @@ inline void TypedColumnWriter<DType>::WriteBatch(int64_t num_values,
   if (current_encoder_->EstimatedDataEncodedSize() >= properties_->data_pagesize())
{
     AddDataPage();
   }
+  if (has_dictionary_ && !fallback_) { CheckDictionarySizeLimit(); }
+}
+
+template <typename DType>
+inline void TypedColumnWriter<DType>::WriteBatch(int64_t num_values,
+    const int16_t* def_levels, const int16_t* rep_levels, const T* values) {
+  // We check for DataPage limits only after we have inserted the values. If a user
+  // writes a large number of values, the DataPage size can be much above the limit.
+  // The purpose of this chunking is to bound this. Even if a user writes large number
+  // of values, the chunking will ensure the AddDataPage() is called at a reasonable
+  // pagesize limit
+  int64_t write_batch_size = properties_->write_batch_size();
+  int num_batches = num_values / write_batch_size;
+  int64_t num_remaining = num_values % write_batch_size;
+  for (int round = 0; round < num_batches; round++) {
+    int64_t offset = round * write_batch_size;
+    WriteMiniBatch(
+        write_batch_size, &def_levels[offset], &rep_levels[offset], &values[offset]);
+  }
+  // Write the remaining values
+  int64_t offset = num_batches * write_batch_size;
+  WriteMiniBatch(
+      num_remaining, &def_levels[offset], &rep_levels[offset], &values[offset]);
 }
 
 template <typename DType>

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6f5ebe5/src/parquet/file/file-metadata-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/file-metadata-test.cc b/src/parquet/file/file-metadata-test.cc
index 852072c..a7f83c5 100644
--- a/src/parquet/file/file-metadata-test.cc
+++ b/src/parquet/file/file-metadata-test.cc
@@ -64,8 +64,8 @@ TEST(Metadata, TestBuildAccess) {
   // column metadata
   col1_builder->SetStatistics(stats_int);
   col2_builder->SetStatistics(stats_float);
-  col1_builder->Finish(nrows / 2, 4, 0, 10, 512, 600, false);
-  col2_builder->Finish(nrows / 2, 24, 0, 30, 512, 600, false);
+  col1_builder->Finish(nrows / 2, 4, 0, 10, 512, 600, true, false);
+  col2_builder->Finish(nrows / 2, 24, 0, 30, 512, 600, true, false);
   rg1_builder->Finish(1024);
 
   // rowgroup2 metadata
@@ -74,8 +74,8 @@ TEST(Metadata, TestBuildAccess) {
   // column metadata
   col1_builder->SetStatistics(stats_int);
   col2_builder->SetStatistics(stats_float);
-  col1_builder->Finish(nrows / 2, 6, 0, 10, 512, 600, false);
-  col2_builder->Finish(nrows / 2, 16, 0, 26, 512, 600, false);
+  col1_builder->Finish(nrows / 2, 6, 0, 10, 512, 600, true, false);
+  col2_builder->Finish(nrows / 2, 16, 0, 26, 512, 600, true, false);
   rg2_builder->Finish(1024);
 
   // Read the metadata

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6f5ebe5/src/parquet/file/metadata.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/metadata.cc b/src/parquet/file/metadata.cc
index bc0f7b9..00ce990 100644
--- a/src/parquet/file/metadata.cc
+++ b/src/parquet/file/metadata.cc
@@ -353,7 +353,7 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl {
 
   void Finish(int64_t num_values, int64_t dictionary_page_offset,
       int64_t index_page_offset, int64_t data_page_offset, int64_t compressed_size,
-      int64_t uncompressed_size, bool dictionary_fallback = false) {
+      int64_t uncompressed_size, bool has_dictionary, bool dictionary_fallback) {
     if (dictionary_page_offset > 0) {
       column_chunk_->__set_file_offset(dictionary_page_offset + compressed_size);
     } else {
@@ -368,16 +368,18 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl {
     column_chunk_->meta_data.__set_total_compressed_size(compressed_size);
     std::vector<format::Encoding::type> thrift_encodings;
     thrift_encodings.push_back(ToThrift(Encoding::RLE));
-    if (properties_->dictionary_enabled(column_->path())) {
+    if (has_dictionary) {
       thrift_encodings.push_back(ToThrift(properties_->dictionary_page_encoding()));
       // add the encoding only if it is unique
       if (properties_->version() == ParquetVersion::PARQUET_2_0) {
         thrift_encodings.push_back(ToThrift(properties_->dictionary_index_encoding()));
       }
-    }
-    if (!properties_->dictionary_enabled(column_->path()) || dictionary_fallback) {
+    } else {  // Dictionary not enabled
       thrift_encodings.push_back(ToThrift(properties_->encoding(column_->path())));
     }
+    // Only PLAIN encoding is supported for fallback in V1
+    // TODO(majetideepak): Use user specified encoding for V2
+    if (dictionary_fallback) { thrift_encodings.push_back(ToThrift(Encoding::PLAIN)); }
     column_chunk_->meta_data.__set_encodings(thrift_encodings);
   }
 
@@ -410,9 +412,10 @@ void ColumnChunkMetaDataBuilder::set_file_path(const std::string&
path) {
 
 void ColumnChunkMetaDataBuilder::Finish(int64_t num_values,
     int64_t dictionary_page_offset, int64_t index_page_offset, int64_t data_page_offset,
-    int64_t compressed_size, int64_t uncompressed_size, bool dictionary_fallback) {
+    int64_t compressed_size, int64_t uncompressed_size, bool has_dictionary,
+    bool dictionary_fallback) {
   impl_->Finish(num_values, dictionary_page_offset, index_page_offset, data_page_offset,
-      compressed_size, uncompressed_size, dictionary_fallback);
+      compressed_size, uncompressed_size, has_dictionary, dictionary_fallback);
 }
 
 const ColumnDescriptor* ColumnChunkMetaDataBuilder::descr() const {

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6f5ebe5/src/parquet/file/metadata.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/metadata.h b/src/parquet/file/metadata.h
index 1d96621..0ef6fa0 100644
--- a/src/parquet/file/metadata.h
+++ b/src/parquet/file/metadata.h
@@ -149,7 +149,7 @@ class PARQUET_EXPORT ColumnChunkMetaDataBuilder {
   // commit the metadata
   void Finish(int64_t num_values, int64_t dictonary_page_offset,
       int64_t index_page_offset, int64_t data_page_offset, int64_t compressed_size,
-      int64_t uncompressed_size, bool dictionary_fallback);
+      int64_t uncompressed_size, bool has_dictionary, bool dictionary_fallback);
 
  private:
   explicit ColumnChunkMetaDataBuilder(const std::shared_ptr<WriterProperties>&
props,

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6f5ebe5/src/parquet/file/writer-internal.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer-internal.cc b/src/parquet/file/writer-internal.cc
index fb05f13..2d396b7 100644
--- a/src/parquet/file/writer-internal.cc
+++ b/src/parquet/file/writer-internal.cc
@@ -46,11 +46,9 @@ SerializedPageWriter::SerializedPageWriter(OutputStream* sink, Compression::type
   compressor_ = Codec::Create(codec);
 }
 
-void SerializedPageWriter::Close() {
-  // index_page_offset = 0 since they are not supported
-  // TODO: Remove default fallback = 'false' when implemented
+void SerializedPageWriter::Close(bool has_dictionary, bool fallback) {
   metadata_->Finish(num_values_, dictionary_page_offset_, 0, data_page_offset_,
-      total_compressed_size_, total_uncompressed_size_, false);
+      total_compressed_size_, total_uncompressed_size_, has_dictionary, fallback);
 }
 
 std::shared_ptr<Buffer> SerializedPageWriter::Compress(

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6f5ebe5/src/parquet/file/writer-internal.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer-internal.h b/src/parquet/file/writer-internal.h
index 645d4bf..e6364e9 100644
--- a/src/parquet/file/writer-internal.h
+++ b/src/parquet/file/writer-internal.h
@@ -44,7 +44,7 @@ class SerializedPageWriter : public PageWriter {
 
   int64_t WriteDictionaryPage(const DictionaryPage& page) override;
 
-  void Close() override;
+  void Close(bool has_dictionary, bool fallback) override;
 
  private:
   OutputStream* sink_;


Mime
View raw message