parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From n...@apache.org
Subject parquet-cpp git commit: PARQUET-169: Implement support for bulk reading and writing rep/def levels
Date Thu, 11 Feb 2016 04:43:47 GMT
Repository: parquet-cpp
Updated Branches:
  refs/heads/master fdbe437b1 -> 13843d3d4


PARQUET-169: Implement support for bulk reading and writing rep/def levels

Added a LevelDecoder and LevelEncoder class to read and write batches of def/rep levels.
Added tests to verify the functionality.

Author: Deepak Majeti <deepak.majeti@hp.com>

Closes #30 from majetideepak/master and squashes the following commits:

18d7e51 [Deepak Majeti] fixed argument order of asserts inside test cases
5e0000b [Deepak Majeti] PARQUET-169: Implement support for reading repetition and definition
levels


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/13843d3d
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/13843d3d
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/13843d3d

Branch: refs/heads/master
Commit: 13843d3d4d2fe8a2c70b6ab8a6f40f55ebe9680b
Parents: fdbe437
Author: Deepak Majeti <deepak.majeti@hp.com>
Authored: Wed Feb 10 20:43:44 2016 -0800
Committer: Nong Li <nongli@gmail.com>
Committed: Wed Feb 10 20:43:44 2016 -0800

----------------------------------------------------------------------
 src/parquet/column/CMakeLists.txt        |   2 +
 src/parquet/column/column-reader-test.cc |  59 +++++++++-
 src/parquet/column/levels-test.cc        | 129 +++++++++++++++++++++
 src/parquet/column/levels.h              | 156 ++++++++++++++++++++++++++
 src/parquet/column/page.h                |   8 ++
 src/parquet/column/reader.cc             |  67 ++++-------
 src/parquet/column/reader.h              |  17 ++-
 src/parquet/column/test-util.h           |  19 ++--
 src/parquet/reader-test.cc               |   1 -
 src/parquet/reader.cc                    |   6 +-
 10 files changed, 393 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13843d3d/src/parquet/column/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/parquet/column/CMakeLists.txt b/src/parquet/column/CMakeLists.txt
index 7eb334e..423f544 100644
--- a/src/parquet/column/CMakeLists.txt
+++ b/src/parquet/column/CMakeLists.txt
@@ -18,9 +18,11 @@
 # Headers: top level
 install(FILES
   page.h
+  levels.h
   reader.h
   serialized-page.h
   scanner.h
   DESTINATION include/parquet/column)
 
 ADD_PARQUET_TEST(column-reader-test)
+ADD_PARQUET_TEST(levels-test)

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13843d3d/src/parquet/column/column-reader-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/column-reader-test.cc b/src/parquet/column/column-reader-test.cc
index 920ae56..0d4aea1 100644
--- a/src/parquet/column/column-reader-test.cc
+++ b/src/parquet/column/column-reader-test.cc
@@ -124,9 +124,6 @@ TEST_F(TestPrimitiveReader, TestInt32FlatOptional) {
 
   Int32Reader* reader = static_cast<Int32Reader*>(reader_.get());
 
-  std::vector<int32_t> vexpected;
-  std::vector<int16_t> dexpected;
-
   size_t values_read = 0;
   size_t batch_actual = 0;
 
@@ -157,6 +154,62 @@ TEST_F(TestPrimitiveReader, TestInt32FlatOptional) {
   ASSERT_EQ(0, values_read);
 }
 
+TEST_F(TestPrimitiveReader, TestInt32FlatRepeated) {
+  vector<int32_t> values = {1, 2, 3, 4, 5};
+  vector<int16_t> def_levels = {2, 1, 1, 2, 2, 1, 1, 2, 2, 1};
+  vector<int16_t> rep_levels = {0, 1, 1, 0, 0, 1, 1, 0, 0, 1};
+
+  size_t num_values = values.size();
+  parquet::Encoding::type value_encoding = parquet::Encoding::PLAIN;
+
+  vector<uint8_t> page1;
+  test::DataPageBuilder<Type::INT32> page_builder(&page1);
+
+  // Definition levels precede the values
+  page_builder.AppendRepLevels(rep_levels, 1, parquet::Encoding::RLE);
+  page_builder.AppendDefLevels(def_levels, 2, parquet::Encoding::RLE);
+  page_builder.AppendValues(values, parquet::Encoding::PLAIN);
+
+  pages_.push_back(page_builder.Finish());
+
+  NodePtr type = schema::Int32("a", Repetition::REPEATED);
+  ColumnDescriptor descr(type, 2, 1);
+  InitReader(&descr);
+
+  Int32Reader* reader = static_cast<Int32Reader*>(reader_.get());
+
+  size_t values_read = 0;
+  size_t batch_actual = 0;
+
+  vector<int32_t> vresult(3, -1);
+  vector<int16_t> dresult(5, -1);
+  vector<int16_t> rresult(5, -1);
+
+  batch_actual = reader->ReadBatch(5, &dresult[0], &rresult[0],
+      &vresult[0], &values_read);
+  ASSERT_EQ(5, batch_actual);
+  ASSERT_EQ(3, values_read);
+
+  ASSERT_TRUE(vector_equal(vresult, slice(values, 0, 3)));
+  ASSERT_TRUE(vector_equal(dresult, slice(def_levels, 0, 5)));
+  ASSERT_TRUE(vector_equal(rresult, slice(rep_levels, 0, 5)));
+
+  batch_actual = reader->ReadBatch(5, &dresult[0], &rresult[0],
+      &vresult[0], &values_read);
+  ASSERT_EQ(5, batch_actual);
+  ASSERT_EQ(2, values_read);
+
+  ASSERT_TRUE(vector_equal(slice(vresult, 0, 2), slice(values, 3, 5)));
+  ASSERT_TRUE(vector_equal(dresult, slice(def_levels, 5, 10)));
+  ASSERT_TRUE(vector_equal(rresult, slice(rep_levels, 5, 10)));
+
+  // EOS, pass all nullptrs to check for improper writes. Do not segfault /
+  // core dump
+  batch_actual = reader->ReadBatch(5, nullptr, nullptr,
+      nullptr, &values_read);
+  ASSERT_EQ(0, batch_actual);
+  ASSERT_EQ(0, values_read);
+}
 } // namespace test
 
 } // namespace parquet_cpp

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13843d3d/src/parquet/column/levels-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/levels-test.cc b/src/parquet/column/levels-test.cc
new file mode 100644
index 0000000..99cc21e
--- /dev/null
+++ b/src/parquet/column/levels-test.cc
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdlib>
+#include <iostream>
+#include <sstream>
+#include <string>
+
+#include <gtest/gtest.h>
+
+#include "parquet/thrift/parquet_types.h"
+#include "parquet/column/levels.h"
+
+using std::string;
+
+namespace parquet_cpp {
+
+class TestLevels : public ::testing::Test {
+ public:
+  int GenerateLevels(int min_repeat_factor, int max_repeat_factor,
+      int max_level, std::vector<int16_t>& input_levels) {
+    int total_count = 0;
+    // for each repetition count upto max_repeat_factor
+    for (int repeat = min_repeat_factor; repeat <= max_repeat_factor; repeat++) {
+      // repeat count increase by a factor of 2 for every iteration
+      int repeat_count = (1 << repeat);
+      // generate levels for repetition count upto the maximum level
+      int value = 0;
+      int bwidth = 0;
+      while (value <= max_level) {
+        for (int i = 0; i < repeat_count; i++) {
+          input_levels[total_count++] = value;
+        }
+        value = (2 << bwidth) - 1;
+        bwidth++;
+      }
+    }
+    return total_count;
+  }
+
+  void VerifyLevelsEncoding(parquet::Encoding::type encoding, int max_level,
+      std::vector<int16_t>& input_levels) {
+    LevelEncoder encoder;
+    LevelDecoder decoder;
+    int levels_count = 0;
+    std::vector<int16_t> output_levels;
+    std::vector<uint8_t> bytes;
+    int num_levels = input_levels.size();
+    output_levels.resize(num_levels);
+    bytes.resize(2 * num_levels);
+    ASSERT_EQ(num_levels, output_levels.size());
+    ASSERT_EQ(2 * num_levels, bytes.size());
+    // start encoding and decoding
+    if (encoding == parquet::Encoding::RLE) {
+      // leave space to write the rle length value
+      encoder.Init(encoding, max_level, num_levels,
+          bytes.data() + sizeof(uint32_t), bytes.size());
+
+      levels_count = encoder.Encode(num_levels, input_levels.data());
+      (reinterpret_cast<uint32_t*>(bytes.data()))[0] = encoder.len();
+
+    } else {
+      encoder.Init(encoding, max_level, num_levels,
+          bytes.data(), bytes.size());
+      levels_count = encoder.Encode(num_levels, input_levels.data());
+    }
+
+    ASSERT_EQ(num_levels, levels_count);
+
+    decoder.Init(encoding, max_level, num_levels, bytes.data());
+    levels_count = decoder.Decode(num_levels, output_levels.data());
+
+    ASSERT_EQ(num_levels, levels_count);
+
+    for (int i = 0; i < num_levels; i++) {
+      EXPECT_EQ(input_levels[i], output_levels[i]);
+    }
+  }
+};
+
+// test levels with maximum bit-width from 1 to 8
+// increase the repetition count for each iteration by a factor of 2
+TEST_F(TestLevels, TestEncodeDecodeLevels) {
+  int min_repeat_factor = 0;
+  int max_repeat_factor = 7; // 128
+  int max_bit_width = 8;
+  std::vector<int16_t> input_levels;
+  parquet::Encoding::type encodings[2] = {parquet::Encoding::RLE,
+      parquet::Encoding::BIT_PACKED};
+
+  // for each encoding
+  for (int encode = 0; encode < 2; encode++) {
+    parquet::Encoding::type encoding = encodings[encode];
+    // BIT_PACKED requires a sequence of atleast 8
+    if (encoding == parquet::Encoding::BIT_PACKED) min_repeat_factor = 3;
+
+    // for each maximum bit-width
+    for (int bit_width = 1; bit_width <= max_bit_width; bit_width++) {
+      int num_levels_per_width = ((2 << max_repeat_factor) - (1 << min_repeat_factor));
+      int num_levels = (bit_width + 1) * num_levels_per_width;
+      input_levels.resize(num_levels);
+      ASSERT_EQ(num_levels, input_levels.size());
+
+      // find the maximum level for the current bit_width
+      int max_level = (1 << bit_width) - 1;
+      // Generate levels
+      int total_count = GenerateLevels(min_repeat_factor, max_repeat_factor,
+          max_level, input_levels);
+      ASSERT_EQ(num_levels, total_count);
+      VerifyLevelsEncoding(encoding, max_level, input_levels);
+    }
+  }
+}
+
+} // namespace parquet_cpp

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13843d3d/src/parquet/column/levels.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/levels.h b/src/parquet/column/levels.h
new file mode 100644
index 0000000..4056223
--- /dev/null
+++ b/src/parquet/column/levels.h
@@ -0,0 +1,156 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_COLUMN_LEVELS_H
+#define PARQUET_COLUMN_LEVELS_H
+
+#include "parquet/exception.h"
+#include "parquet/thrift/parquet_types.h"
+#include "parquet/encodings/encodings.h"
+#include "parquet/util/rle-encoding.h"
+
+namespace parquet_cpp {
+
+class LevelEncoder {
+ public:
+  LevelEncoder() {}
+
+  // Initialize the LevelEncoder.
+  void Init(parquet::Encoding::type encoding, int16_t max_level,
+      int num_buffered_values, uint8_t* data, int data_size) {
+    bit_width_ = BitUtil::Log2(max_level + 1);
+    encoding_ = encoding;
+    switch (encoding) {
+      case parquet::Encoding::RLE: {
+        rle_encoder_.reset(new RleEncoder(data, data_size, bit_width_));
+        break;
+      }
+      case parquet::Encoding::BIT_PACKED: {
+        int num_bytes = BitUtil::Ceil(num_buffered_values * bit_width_, 8);
+        bit_packed_encoder_.reset(new BitWriter(data, num_bytes));
+        break;
+      }
+      default:
+        throw ParquetException("Unknown encoding type for levels.");
+    }
+  }
+
+  // Encodes a batch of levels from an array and returns the number of levels encoded
+  size_t Encode(size_t batch_size, const int16_t* levels) {
+    size_t num_encoded = 0;
+    if (!rle_encoder_ && !bit_packed_encoder_) {
+      throw ParquetException("Level encoders are not initialized.");
+    }
+
+    if (encoding_ == parquet::Encoding::RLE) {
+      for (size_t i = 0; i < batch_size; ++i) {
+        if (!rle_encoder_->Put(*(levels + i))) {
+          break;
+        }
+        ++num_encoded;
+      }
+      rle_encoder_->Flush();
+      rle_length_ = rle_encoder_->len();
+    } else {
+      for (size_t i = 0; i < batch_size; ++i) {
+        if (!bit_packed_encoder_->PutValue(*(levels + i), bit_width_)) {
+          break;
+        }
+        ++num_encoded;
+      }
+      bit_packed_encoder_->Flush();
+    }
+    return num_encoded;
+  }
+
+  int32_t len() {
+    assert(encoding_ == parquet::Encoding::RLE);
+    return rle_length_;
+  }
+
+ private:
+  int bit_width_;
+  int rle_length_;
+  parquet::Encoding::type encoding_;
+  std::unique_ptr<RleEncoder> rle_encoder_;
+  std::unique_ptr<BitWriter> bit_packed_encoder_;
+};
+
+
+class LevelDecoder {
+ public:
+  LevelDecoder() {}
+
+  // Initialize the LevelDecoder and return the number of bytes consumed
+  size_t Init(parquet::Encoding::type encoding, int16_t max_level,
+      int num_buffered_values, const uint8_t* data) {
+    uint32_t num_bytes = 0;
+    uint32_t total_bytes = 0;
+    bit_width_ = BitUtil::Log2(max_level + 1);
+    encoding_ = encoding;
+    switch (encoding) {
+      case parquet::Encoding::RLE: {
+        num_bytes = *reinterpret_cast<const uint32_t*>(data);
+        const uint8_t* decoder_data = data + sizeof(uint32_t);
+        rle_decoder_.reset(new RleDecoder(decoder_data, num_bytes, bit_width_));
+        return sizeof(uint32_t) + num_bytes;
+      }
+      case parquet::Encoding::BIT_PACKED: {
+        num_bytes = BitUtil::Ceil(num_buffered_values * bit_width_, 8);
+        bit_packed_decoder_.reset(new BitReader(data, num_bytes));
+        return num_bytes;
+      }
+      default:
+        throw ParquetException("Unknown encoding type for levels.");
+    }
+    return -1;
+  }
+
+  // Decodes a batch of levels into an array and returns the number of levels decoded
+  size_t Decode(size_t batch_size, int16_t* levels) {
+    size_t num_decoded = 0;
+    if (!rle_decoder_ && !bit_packed_decoder_) {
+      throw ParquetException("Level decoders are not initialized.");
+    }
+
+    if (encoding_ == parquet::Encoding::RLE) {
+      for (size_t i = 0; i < batch_size; ++i) {
+        if (!rle_decoder_->Get(levels + i)) {
+          break;
+        }
+        ++num_decoded;
+      }
+    } else {
+      for (size_t i = 0; i < batch_size; ++i) {
+        if (!bit_packed_decoder_->GetValue(bit_width_, levels + i)) {
+          break;
+        }
+        ++num_decoded;
+      }
+    }
+    return num_decoded;
+  }
+
+ private:
+  int bit_width_;
+  parquet::Encoding::type encoding_;
+  std::unique_ptr<RleDecoder> rle_decoder_;
+  std::unique_ptr<BitReader> bit_packed_decoder_;
+};
+
+} // namespace parquet_cpp
+#endif // PARQUET_COLUMN_LEVELS_H

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13843d3d/src/parquet/column/page.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/page.h b/src/parquet/column/page.h
index 46f5d62..f2740b6 100644
--- a/src/parquet/column/page.h
+++ b/src/parquet/column/page.h
@@ -84,6 +84,14 @@ class DataPage : public Page {
     return header_.encoding;
   }
 
+  parquet::Encoding::type repetition_level_encoding() const {
+    return header_.repetition_level_encoding;
+  }
+
+  parquet::Encoding::type definition_level_encoding() const {
+    return header_.definition_level_encoding;
+  }
+
  private:
   parquet::DataPageHeader header_;
 };

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13843d3d/src/parquet/column/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/reader.cc b/src/parquet/column/reader.cc
index 0fe7a6e..878bd4f 100644
--- a/src/parquet/column/reader.cc
+++ b/src/parquet/column/reader.cc
@@ -59,18 +59,6 @@ void TypedColumnReader<TYPE>::ConfigureDictionary(const DictionaryPage*
page) {
   current_decoder_ = decoders_[encoding].get();
 }
 
-
-static size_t InitializeLevelDecoder(const uint8_t* buffer,
-    int16_t max_level, std::unique_ptr<RleDecoder>& decoder) {
-  int num_definition_bytes = *reinterpret_cast<const uint32_t*>(buffer);
-
-  decoder.reset(new RleDecoder(buffer + sizeof(uint32_t),
-          num_definition_bytes,
-          BitUtil::NumRequiredBits(max_level)));
-
-  return sizeof(uint32_t) + num_definition_bytes;
-}
-
 // PLAIN_DICTIONARY is deprecated but used to be used as a dictionary index
 // encoding.
 static bool IsDictionaryIndexEncoding(const parquet::Encoding::type& e) {
@@ -109,24 +97,30 @@ bool TypedColumnReader<TYPE>::ReadNewPage() {
       // the page size to determine the number of bytes in the encoded data.
       size_t data_size = page->size();
 
-      max_definition_level_ = descr_->max_definition_level();
-
-      // Read definition levels.
-      if (max_definition_level_ > 0) {
-        // Temporary hack until schema resolution implemented
-
-        size_t def_levels_bytes = InitializeLevelDecoder(buffer,
-            max_definition_level_, definition_level_decoder_);
-
+      int16_t max_definition_level = descr_->max_definition_level();
+      int16_t max_repetition_level = descr_->max_repetition_level();
+      //Data page Layout: Repetition Levels - Definition Levels - encoded values.
+      //Levels are encoded as rle or bit-packed.
+      //Init repetition levels
+      if (max_repetition_level > 0) {
+        size_t rep_levels_bytes = repetition_level_decoder_.Init(
+            page->repetition_level_encoding(),
+            max_repetition_level, num_buffered_values_, buffer);
+        buffer += rep_levels_bytes;
+        data_size -= rep_levels_bytes;
+      }
+      //TODO figure a way to set max_definition_level_ to 0
+      //if the initial value is invalid
+
+      //Init definition levels
+      if (max_definition_level > 0) {
+        size_t def_levels_bytes = definition_level_decoder_.Init(
+            page->definition_level_encoding(),
+            max_definition_level, num_buffered_values_, buffer);
         buffer += def_levels_bytes;
         data_size -= def_levels_bytes;
-      } else {
-        // REQUIRED field
-        max_definition_level_ = 0;
       }
 
-      // TODO: repetition levels
-
       // Get a decoder object for this page or create a new decoder if this is the
       // first page with this encoding.
       parquet::Encoding::type encoding = page->encoding();
@@ -172,31 +166,18 @@ bool TypedColumnReader<TYPE>::ReadNewPage() {
 // ----------------------------------------------------------------------
 // Batch read APIs
 
-static size_t DecodeMany(RleDecoder* decoder, int16_t* levels, size_t batch_size) {
-  size_t num_decoded = 0;
-
-  // TODO(wesm): Push this decoding down into RleDecoder itself
-  for (size_t i = 0; i < batch_size; ++i) {
-    if (!decoder->Get(levels + i)) {
-      break;
-    }
-    ++num_decoded;
-  }
-  return num_decoded;
-}
-
 size_t ColumnReader::ReadDefinitionLevels(size_t batch_size, int16_t* levels) {
-  if (!definition_level_decoder_) {
+  if (descr_->max_definition_level() == 0) {
     return 0;
   }
-  return DecodeMany(definition_level_decoder_.get(), levels, batch_size);
+  return definition_level_decoder_.Decode(batch_size, levels);
 }
 
 size_t ColumnReader::ReadRepetitionLevels(size_t batch_size, int16_t* levels) {
-  if (!repetition_level_decoder_) {
+  if (descr_->max_repetition_level() == 0) {
     return 0;
   }
-  return DecodeMany(repetition_level_decoder_.get(), levels, batch_size);
+  return repetition_level_decoder_.Decode(batch_size, levels);
 }
 
 // ----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13843d3d/src/parquet/column/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/reader.h b/src/parquet/column/reader.h
index 0d10f0f..4585de8 100644
--- a/src/parquet/column/reader.h
+++ b/src/parquet/column/reader.h
@@ -33,9 +33,11 @@
 #include "parquet/encodings/encodings.h"
 #include "parquet/schema/descriptor.h"
 #include "parquet/util/rle-encoding.h"
+#include "parquet/column/levels.h"
 
 namespace parquet_cpp {
 
+
 class Codec;
 class Scanner;
 
@@ -85,13 +87,10 @@ class ColumnReader {
   std::shared_ptr<Page> current_page_;
 
   // Not set if full schema for this field has no optional or repeated elements
-  std::unique_ptr<RleDecoder> definition_level_decoder_;
+  LevelDecoder definition_level_decoder_;
 
   // Not set for flat schemas.
-  std::unique_ptr<RleDecoder> repetition_level_decoder_;
-
-  // Temporarily storing this to assist with batch reading
-  int16_t max_definition_level_;
+  LevelDecoder repetition_level_decoder_;
 
   // The total number of values stored in the data page. This is the maximum of
   // the number of encoded definition levels or encoded values. For
@@ -182,13 +181,12 @@ inline size_t TypedColumnReader<TYPE>::ReadBatch(int batch_size,
int16_t* def_le
   size_t values_to_read = 0;
 
   // If the field is required and non-repeated, there are no definition levels
-  if (definition_level_decoder_) {
+  if (descr_->max_definition_level() > 0) {
     num_def_levels = ReadDefinitionLevels(batch_size, def_levels);
-
     // TODO(wesm): this tallying of values-to-decode can be performed with better
     // cache-efficiency if fused with the level decoding.
     for (size_t i = 0; i < num_def_levels; ++i) {
-      if (def_levels[i] == max_definition_level_) {
+      if (def_levels[i] == descr_->max_definition_level()) {
         ++values_to_read;
       }
     }
@@ -198,9 +196,8 @@ inline size_t TypedColumnReader<TYPE>::ReadBatch(int batch_size,
int16_t* def_le
   }
 
   // Not present for non-repeated fields
-  if (repetition_level_decoder_) {
+  if (descr_->max_repetition_level() > 0) {
     num_rep_levels = ReadRepetitionLevels(batch_size, rep_levels);
-
     if (num_def_levels != num_rep_levels) {
       throw ParquetException("Number of decoded rep / def levels did not match");
     }

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13843d3d/src/parquet/column/test-util.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/test-util.h b/src/parquet/column/test-util.h
index 9aa4e5a..8861134 100644
--- a/src/parquet/column/test-util.h
+++ b/src/parquet/column/test-util.h
@@ -155,22 +155,19 @@ class DataPageBuilder {
     // TODO: compute a more precise maximum size for the encoded levels
     std::vector<uint8_t> encode_buffer(DEFAULT_DATA_PAGE_SIZE);
 
-    RleEncoder encoder(&encode_buffer[0], encode_buffer.size(),
-        BitUtil::NumRequiredBits(max_level));
-
-    // TODO(wesm): push down vector encoding
-    for (int16_t level : levels) {
-      if (!encoder.Put(level)) {
-        throw ParquetException("out of space");
-      }
-    }
 
-    uint32_t rle_bytes = encoder.Flush();
+    LevelEncoder encoder;
+    encoder.Init(encoding, max_level, levels.size(),
+        encode_buffer.data(), encode_buffer.size());
+
+    encoder.Encode(levels.size(), levels.data());
+
+    uint32_t rle_bytes = encoder.len();
     size_t levels_footprint = sizeof(uint32_t) + rle_bytes;
     Reserve(levels_footprint);
 
     *reinterpret_cast<uint32_t*>(Head()) = rle_bytes;
-    memcpy(Head() + sizeof(uint32_t), encoder.buffer(), rle_bytes);
+    memcpy(Head() + sizeof(uint32_t), encode_buffer.data(), rle_bytes);
     buffer_size_ += levels_footprint;
   }
 };

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13843d3d/src/parquet/reader-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/reader-test.cc b/src/parquet/reader-test.cc
index 49f25f0..ffc882c 100644
--- a/src/parquet/reader-test.cc
+++ b/src/parquet/reader-test.cc
@@ -89,7 +89,6 @@ TEST_F(TestAllTypesPlain, TestFlatScannerInt32) {
 
   // column 0, id
   std::shared_ptr<Int32Scanner> scanner(new Int32Scanner(group->Column(0)));
-
   int32_t val;
   bool is_null;
   for (size_t i = 0; i < 8; ++i) {

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13843d3d/src/parquet/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/reader.cc b/src/parquet/reader.cc
index d3bc0a6..2f30ebf 100644
--- a/src/parquet/reader.cc
+++ b/src/parquet/reader.cc
@@ -242,10 +242,10 @@ void ParquetFileReader::DebugPrint(std::ostream& stream, bool print_values)
{
            << ")" << std::endl;
   }
 
-  for (int i = 0; i < num_row_groups(); ++i) {
-    stream << "--- Row Group " << i << " ---\n";
+  for (int r = 0; r < num_row_groups(); ++r) {
+    stream << "--- Row Group " << r << " ---\n";
 
-    RowGroupReader* group_reader = RowGroup(i);
+    RowGroupReader* group_reader = RowGroup(r);
 
     // Print column metadata
     size_t num_columns = group_reader->num_columns();


Mime
View raw message