parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject parquet-cpp git commit: PARQUET-843: Impala is thrown off by a REPEATED root schema node
Date Thu, 26 Jan 2017 02:42:47 GMT
Repository: parquet-cpp
Updated Branches:
  refs/heads/master aacc8b528 -> 257e65b81


PARQUET-843: Impala is thrown off by a REPEATED root schema node

We do not use the repetition level of the root node in the schema, and neither does parquet-mr.
However, Impala 2.8.0 increases the max_repetition_level and expects to find repetition levels
that are not being written out. With this change, Impala is able to read our files again.

Author: Wes McKinney <wes.mckinney@twosigma.com>

Closes #227 from wesm/PARQUET-843 and squashes the following commits:

dec382a [Wes McKinney] Impala is thrown off by a REPEATED root schema node. Some uint32_t->int32_t
changes


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/257e65b8
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/257e65b8
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/257e65b8

Branch: refs/heads/master
Commit: 257e65b8136463b716eeeee53836aec9916d0c15
Parents: aacc8b5
Author: Wes McKinney <wes.mckinney@twosigma.com>
Authored: Wed Jan 25 21:42:37 2017 -0500
Committer: Wes McKinney <wes.mckinney@twosigma.com>
Committed: Wed Jan 25 21:42:37 2017 -0500

----------------------------------------------------------------------
 src/parquet/arrow/schema.cc              |  2 +-
 src/parquet/column/column-writer-test.cc |  9 +++++----
 src/parquet/column/level-benchmark.cc    |  6 +++---
 src/parquet/column/levels-test.cc        |  4 ++--
 src/parquet/column/levels.cc             |  8 ++++----
 src/parquet/column/test-util.h           |  4 ++--
 src/parquet/column/writer.cc             |  9 ++++-----
 src/parquet/file/metadata.cc             | 11 ++++++-----
 8 files changed, 27 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/257e65b8/src/parquet/arrow/schema.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/schema.cc b/src/parquet/arrow/schema.cc
index 4f17f5e..65e3381 100644
--- a/src/parquet/arrow/schema.cc
+++ b/src/parquet/arrow/schema.cc
@@ -406,7 +406,7 @@ Status ToParquetSchema(const ::arrow::Schema* arrow_schema,
     RETURN_NOT_OK(FieldToNode(arrow_schema->field(i), properties, &nodes[i]));
   }
 
-  NodePtr schema = GroupNode::Make("schema", Repetition::REPEATED, nodes);
+  NodePtr schema = GroupNode::Make("schema", Repetition::REQUIRED, nodes);
   *out = std::make_shared<::parquet::SchemaDescriptor>();
   PARQUET_CATCH_NOT_OK((*out)->Init(schema));
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/257e65b8/src/parquet/column/column-writer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/column-writer-test.cc b/src/parquet/column/column-writer-test.cc
index fc944ca..dedb2c2 100644
--- a/src/parquet/column/column-writer-test.cc
+++ b/src/parquet/column/column-writer-test.cc
@@ -431,12 +431,13 @@ TYPED_TEST(TestPrimitiveWriter, RequiredVeryLargeChunk) {
   // There are 3 encodings (RLE, PLAIN_DICTIONARY, PLAIN) in a fallback case
   // Dictionary encoding is not allowed for boolean type
   // There are 2 encodings (RLE, PLAIN) in a non dictionary encoding case
-  ASSERT_EQ(Encoding::RLE, encodings[0]);
   if (this->type_num() != Type::BOOLEAN) {
-    ASSERT_EQ(Encoding::PLAIN_DICTIONARY, encodings[1]);
-    ASSERT_EQ(Encoding::PLAIN, encodings[2]);
-  } else {
+    ASSERT_EQ(Encoding::PLAIN_DICTIONARY, encodings[0]);
     ASSERT_EQ(Encoding::PLAIN, encodings[1]);
+    ASSERT_EQ(Encoding::RLE, encodings[2]);
+  } else {
+    ASSERT_EQ(Encoding::PLAIN, encodings[0]);
+    ASSERT_EQ(Encoding::RLE, encodings[1]);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/257e65b8/src/parquet/column/level-benchmark.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/level-benchmark.cc b/src/parquet/column/level-benchmark.cc
index 8ae2fe1..036108f 100644
--- a/src/parquet/column/level-benchmark.cc
+++ b/src/parquet/column/level-benchmark.cc
@@ -55,11 +55,11 @@ static void BM_RleDecoding(::benchmark::State& state) {
   int16_t max_level = 1;
   int64_t rle_size = LevelEncoder::MaxBufferSize(Encoding::RLE, max_level, levels.size());
   auto buffer_rle = std::make_shared<PoolBuffer>();
-  PARQUET_THROW_NOT_OK(buffer_rle->Resize(rle_size + sizeof(uint32_t)));
+  PARQUET_THROW_NOT_OK(buffer_rle->Resize(rle_size + sizeof(int32_t)));
   level_encoder.Init(Encoding::RLE, max_level, levels.size(),
-      buffer_rle->mutable_data() + sizeof(uint32_t), rle_size);
+      buffer_rle->mutable_data() + sizeof(int32_t), rle_size);
   level_encoder.Encode(levels.size(), levels.data());
-  reinterpret_cast<uint32_t*>(buffer_rle->mutable_data())[0] = level_encoder.len();
+  reinterpret_cast<int32_t*>(buffer_rle->mutable_data())[0] = level_encoder.len();
 
   while (state.KeepRunning()) {
     LevelDecoder level_decoder;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/257e65b8/src/parquet/column/levels-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/levels-test.cc b/src/parquet/column/levels-test.cc
index cee2763..1d29313 100644
--- a/src/parquet/column/levels-test.cc
+++ b/src/parquet/column/levels-test.cc
@@ -57,10 +57,10 @@ void EncodeLevels(Encoding::type encoding, int max_level, int num_levels,
   if (encoding == Encoding::RLE) {
     // leave space to write the rle length value
     encoder.Init(
-        encoding, max_level, num_levels, bytes.data() + sizeof(uint32_t), bytes.size());
+        encoding, max_level, num_levels, bytes.data() + sizeof(int32_t), bytes.size());
 
     levels_count = encoder.Encode(num_levels, input_levels);
-    (reinterpret_cast<uint32_t*>(bytes.data()))[0] = encoder.len();
+    (reinterpret_cast<int32_t*>(bytes.data()))[0] = encoder.len();
   } else {
     encoder.Init(encoding, max_level, num_levels, bytes.data(), bytes.size());
     levels_count = encoder.Encode(num_levels, input_levels);

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/257e65b8/src/parquet/column/levels.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/levels.cc b/src/parquet/column/levels.cc
index 3e7b9df..716e08a 100644
--- a/src/parquet/column/levels.cc
+++ b/src/parquet/column/levels.cc
@@ -96,20 +96,20 @@ LevelDecoder::~LevelDecoder() {}
 
 int LevelDecoder::SetData(Encoding::type encoding, int16_t max_level,
     int num_buffered_values, const uint8_t* data) {
-  uint32_t num_bytes = 0;
+  int32_t num_bytes = 0;
   encoding_ = encoding;
   num_values_remaining_ = num_buffered_values;
   bit_width_ = BitUtil::Log2(max_level + 1);
   switch (encoding) {
     case Encoding::RLE: {
-      num_bytes = *reinterpret_cast<const uint32_t*>(data);
-      const uint8_t* decoder_data = data + sizeof(uint32_t);
+      num_bytes = *reinterpret_cast<const int32_t*>(data);
+      const uint8_t* decoder_data = data + sizeof(int32_t);
       if (!rle_decoder_) {
         rle_decoder_.reset(new RleDecoder(decoder_data, num_bytes, bit_width_));
       } else {
         rle_decoder_->Reset(decoder_data, num_bytes, bit_width_);
       }
-      return sizeof(uint32_t) + num_bytes;
+      return sizeof(int32_t) + num_bytes;
     }
     case Encoding::BIT_PACKED: {
       num_bytes = BitUtil::Ceil(num_buffered_values * bit_width_, 8);

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/257e65b8/src/parquet/column/test-util.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/test-util.h b/src/parquet/column/test-util.h
index 9efa623..f0580f5 100644
--- a/src/parquet/column/test-util.h
+++ b/src/parquet/column/test-util.h
@@ -182,8 +182,8 @@ class DataPageBuilder {
 
     encoder.Encode(levels.size(), levels.data());
 
-    uint32_t rle_bytes = encoder.len();
-    sink_->Write(reinterpret_cast<const uint8_t*>(&rle_bytes), sizeof(uint32_t));
+    int32_t rle_bytes = encoder.len();
+    sink_->Write(reinterpret_cast<const uint8_t*>(&rle_bytes), sizeof(int32_t));
     sink_->Write(encode_buffer.data(), rle_bytes);
   }
 };

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/257e65b8/src/parquet/column/writer.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/writer.cc b/src/parquet/column/writer.cc
index 406ded1..f06ac30 100644
--- a/src/parquet/column/writer.cc
+++ b/src/parquet/column/writer.cc
@@ -77,17 +77,16 @@ std::shared_ptr<Buffer> ColumnWriter::RleEncodeLevels(
   // TODO: This only works with due to some RLE specifics
   int64_t rle_size =
       LevelEncoder::MaxBufferSize(Encoding::RLE, max_level, num_buffered_values_) +
-      sizeof(uint32_t);
+      sizeof(int32_t);
   std::shared_ptr<PoolBuffer> buffer_rle =
       AllocateBuffer(properties_->allocator(), rle_size);
   level_encoder_.Init(Encoding::RLE, max_level, num_buffered_values_,
-      buffer_rle->mutable_data() + sizeof(uint32_t),
-      buffer_rle->size() - sizeof(uint32_t));
+      buffer_rle->mutable_data() + sizeof(int32_t), buffer_rle->size() - sizeof(int32_t));
   int encoded = level_encoder_.Encode(
       num_buffered_values_, reinterpret_cast<const int16_t*>(buffer->data()));
   DCHECK_EQ(encoded, num_buffered_values_);
-  reinterpret_cast<uint32_t*>(buffer_rle->mutable_data())[0] = level_encoder_.len();
-  int64_t encoded_size = level_encoder_.len() + sizeof(uint32_t);
+  reinterpret_cast<int32_t*>(buffer_rle->mutable_data())[0] = level_encoder_.len();
+  int64_t encoded_size = level_encoder_.len() + sizeof(int32_t);
   DCHECK(rle_size >= encoded_size);
   PARQUET_THROW_NOT_OK(buffer_rle->Resize(encoded_size));
   return std::static_pointer_cast<Buffer>(buffer_rle);

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/257e65b8/src/parquet/file/metadata.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/metadata.cc b/src/parquet/file/metadata.cc
index d9acf41..2f7f9d2 100644
--- a/src/parquet/file/metadata.cc
+++ b/src/parquet/file/metadata.cc
@@ -485,16 +485,17 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl {
     column_chunk_->meta_data.__set_total_uncompressed_size(uncompressed_size);
     column_chunk_->meta_data.__set_total_compressed_size(compressed_size);
     std::vector<format::Encoding::type> thrift_encodings;
-    thrift_encodings.push_back(ToThrift(Encoding::RLE));
     if (has_dictionary) {
-      thrift_encodings.push_back(ToThrift(properties_->dictionary_page_encoding()));
-      // add the encoding only if it is unique
-      if (properties_->version() == ParquetVersion::PARQUET_2_0) {
-        thrift_encodings.push_back(ToThrift(properties_->dictionary_index_encoding()));
+      thrift_encodings.push_back(ToThrift(properties_->dictionary_index_encoding()));
+      if (properties_->version() == ParquetVersion::PARQUET_1_0) {
+        thrift_encodings.push_back(ToThrift(Encoding::PLAIN));
+      } else {
+        thrift_encodings.push_back(ToThrift(properties_->dictionary_page_encoding()));
       }
     } else {  // Dictionary not enabled
       thrift_encodings.push_back(ToThrift(properties_->encoding(column_->path())));
     }
+    thrift_encodings.push_back(ToThrift(Encoding::RLE));
     // Only PLAIN encoding is supported for fallback in V1
     // TODO(majetideepak): Use user specified encoding for V2
     if (dictionary_fallback) { thrift_encodings.push_back(ToThrift(Encoding::PLAIN)); }


Mime
View raw message