parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject parquet-cpp git commit: PARQUET-625: Improve RLE read performance
Date Fri, 03 Jun 2016 18:22:46 GMT
Repository: parquet-cpp
Updated Branches:
  refs/heads/master bd887e2ca -> 483608a1a


PARQUET-625: Improve RLE read performance

Also fixes the RLE decoding benchmark which did not pass the size to the
buffer.

Author: Uwe L. Korn <uwelk@xhochy.com>

Closes #111 from xhochy/parquet-625 and squashes the following commits:

94d1e60 [Uwe L. Korn] Cast to signed int
4ccc149 [Uwe L. Korn] Formatting fixes
e117706 [Uwe L. Korn] Use int instead of uint32_t for the batch size
a4241ec [Uwe L. Korn] Remove obsolete TODOs
65091be [Uwe L. Korn] PARQUET-625: Improve RLE write performance


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/483608a1
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/483608a1
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/483608a1

Branch: refs/heads/master
Commit: 483608a1a11b1342aa99690f698c2e8a47f346c7
Parents: bd887e2
Author: Uwe L. Korn <uwelk@xhochy.com>
Authored: Fri Jun 3 11:23:01 2016 -0700
Committer: Wes McKinney <wesm@apache.org>
Committed: Fri Jun 3 11:23:01 2016 -0700

----------------------------------------------------------------------
 src/parquet/column/level-benchmark.cc | 21 +++++++++-----
 src/parquet/column/levels.h           |  5 +---
 src/parquet/util/rle-encoding.h       | 37 +++++++++++++++++++++++++
 src/parquet/util/rle-test.cc          | 44 +++++++++++++++++++++++-------
 4 files changed, 86 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/483608a1/src/parquet/column/level-benchmark.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/level-benchmark.cc b/src/parquet/column/level-benchmark.cc
index fbb3181..c511c36 100644
--- a/src/parquet/column/level-benchmark.cc
+++ b/src/parquet/column/level-benchmark.cc
@@ -25,8 +25,10 @@ namespace parquet {
 namespace benchmark {
 
 static void BM_RleEncoding(::benchmark::State& state) {
-  // TODO: More than just all 0s
   std::vector<int16_t> levels(state.range_x(), 0);
+  int64_t n = 0;
+  std::generate(levels.begin(), levels.end(),
+      [&state, &n] { return (n++ % state.range_y()) == 0; });
   int16_t max_level = 1;
   int64_t rle_size = LevelEncoder::MaxBufferSize(Encoding::RLE, max_level, levels.size());
   auto buffer_rle = std::make_shared<OwnedMutableBuffer>(rle_size);
@@ -38,20 +40,24 @@ static void BM_RleEncoding(::benchmark::State& state) {
     level_encoder.Encode(levels.size(), levels.data());
   }
   state.SetBytesProcessed(state.iterations() * state.range_x() * sizeof(int16_t));
+  state.SetItemsProcessed(state.iterations() * state.range_x());
 }
 
-BENCHMARK(BM_RleEncoding)->Range(1024, 65536);
+BENCHMARK(BM_RleEncoding)->RangePair(1024, 65536, 1, 16);
 
 static void BM_RleDecoding(::benchmark::State& state) {
   LevelEncoder level_encoder;
-  // TODO: More than just all 0s
   std::vector<int16_t> levels(state.range_x(), 0);
+  int64_t n = 0;
+  std::generate(levels.begin(), levels.end(),
+      [&state, &n] { return (n++ % state.range_y()) == 0; });
   int16_t max_level = 1;
   int64_t rle_size = LevelEncoder::MaxBufferSize(Encoding::RLE, max_level, levels.size());
-  auto buffer_rle = std::make_shared<OwnedMutableBuffer>(rle_size);
-  level_encoder.Init(Encoding::RLE, max_level, levels.size(), buffer_rle->mutable_data(),
-      buffer_rle->size());
+  auto buffer_rle = std::make_shared<OwnedMutableBuffer>(rle_size + sizeof(uint32_t));
+  level_encoder.Init(Encoding::RLE, max_level, levels.size(),
+      buffer_rle->mutable_data() + sizeof(uint32_t), rle_size);
   level_encoder.Encode(levels.size(), levels.data());
+  reinterpret_cast<uint32_t*>(buffer_rle->mutable_data())[0] = level_encoder.len();
 
   while (state.KeepRunning()) {
     LevelDecoder level_decoder;
@@ -60,9 +66,10 @@ static void BM_RleDecoding(::benchmark::State& state) {
   }
 
   state.SetBytesProcessed(state.iterations() * state.range_x() * sizeof(int16_t));
+  state.SetItemsProcessed(state.iterations() * state.range_x());
 }
 
-BENCHMARK(BM_RleDecoding)->Range(1024, 65536);
+BENCHMARK(BM_RleDecoding)->RangePair(1024, 65536, 1, 16);
 
 }  // namespace benchmark
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/483608a1/src/parquet/column/levels.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/levels.h b/src/parquet/column/levels.h
index 20261df..ce751d0 100644
--- a/src/parquet/column/levels.h
+++ b/src/parquet/column/levels.h
@@ -156,10 +156,7 @@ class LevelDecoder {
 
     int num_values = std::min(num_values_remaining_, batch_size);
     if (encoding_ == Encoding::RLE) {
-      for (int i = 0; i < num_values; ++i) {
-        if (!rle_decoder_->Get(levels + i)) { break; }
-        ++num_decoded;
-      }
+      num_decoded = rle_decoder_->GetBatch(levels, num_values);
     } else {
       for (int i = 0; i < num_values; ++i) {
         if (!bit_packed_decoder_->GetValue(bit_width_, levels + i)) { break; }

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/483608a1/src/parquet/util/rle-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/rle-encoding.h b/src/parquet/util/rle-encoding.h
index 20c9621..b7fa83a 100644
--- a/src/parquet/util/rle-encoding.h
+++ b/src/parquet/util/rle-encoding.h
@@ -110,6 +110,10 @@ class RleDecoder {
   template <typename T>
   bool Get(T* val);
 
+  /// Gets a batch of values.  Returns the number of decoded elements.
+  template <typename T>
+  int GetBatch(T* values, int batch_size);
+
  protected:
   BitReader bit_reader_;
   /// Number of bits needed to encode the value. Must be between 0 and 64.
@@ -268,6 +272,39 @@ inline bool RleDecoder::Get(T* val) {
 }
 
 template <typename T>
+inline int RleDecoder::GetBatch(T* values, int batch_size) {
+  DCHECK_GE(bit_width_, 0);
+  int values_read = 0;
+
+  while (values_read < batch_size) {
+    if (UNLIKELY(literal_count_ == 0 && repeat_count_ == 0)) {
+      if (!NextCounts<T>()) return values_read;
+    }
+
+    if (LIKELY(repeat_count_ > 0)) {
+      int repeat_batch =
+          std::min(batch_size - values_read, static_cast<int>(repeat_count_));
+      std::fill(
+          values + values_read, values + values_read + repeat_batch, current_value_);
+      repeat_count_ -= repeat_batch;
+      values_read += repeat_batch;
+    } else {
+      DCHECK_GT(literal_count_, 0);
+      int literal_batch =
+          std::min(batch_size - values_read, static_cast<int>(literal_count_));
+      for (int i = 0; i < literal_batch; i++) {
+        bool result = bit_reader_.GetValue(bit_width_, values + values_read + i);
+        DCHECK(result);
+      }
+      literal_count_ -= literal_batch;
+      values_read += literal_batch;
+    }
+  }
+
+  return values_read;
+}
+
+template <typename T>
 bool RleDecoder::NextCounts() {
   // Read the next run's indicator int, it could be a literal or repeated run.
   // The int is encoded as a vlq-encoded value.

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/483608a1/src/parquet/util/rle-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/rle-test.cc b/src/parquet/util/rle-test.cc
index 6cfaf9d..c7d95d4 100644
--- a/src/parquet/util/rle-test.cc
+++ b/src/parquet/util/rle-test.cc
@@ -195,12 +195,22 @@ void ValidateRle(const vector<int>& values, int bit_width,
uint8_t* expected_enc
   }
 
   // Verify read
-  RleDecoder decoder(buffer, len, bit_width);
-  for (size_t i = 0; i < values.size(); ++i) {
-    uint64_t val;
-    bool result = decoder.Get(&val);
-    EXPECT_TRUE(result);
-    EXPECT_EQ(values[i], val);
+  {
+    RleDecoder decoder(buffer, len, bit_width);
+    for (size_t i = 0; i < values.size(); ++i) {
+      uint64_t val;
+      bool result = decoder.Get(&val);
+      EXPECT_TRUE(result);
+      EXPECT_EQ(values[i], val);
+    }
+  }
+
+  // Verify batch read
+  {
+    RleDecoder decoder(buffer, len, bit_width);
+    vector<int> values_read(values.size());
+    ASSERT_EQ(values.size(), decoder.GetBatch(values_read.data(), values.size()));
+    EXPECT_EQ(values, values_read);
   }
 }
 
@@ -217,11 +227,25 @@ bool CheckRoundTrip(const vector<int>& values, int bit_width)
{
   int encoded_len = encoder.Flush();
   int out = 0;
 
-  RleDecoder decoder(buffer, encoded_len, bit_width);
-  for (size_t i = 0; i < values.size(); ++i) {
-    EXPECT_TRUE(decoder.Get(&out));
-    if (values[i] != out) { return false; }
+  {
+    RleDecoder decoder(buffer, encoded_len, bit_width);
+    for (size_t i = 0; i < values.size(); ++i) {
+      EXPECT_TRUE(decoder.Get(&out));
+      if (values[i] != out) { return false; }
+    }
+  }
+
+  // Verify batch read
+  {
+    RleDecoder decoder(buffer, len, bit_width);
+    vector<int> values_read(values.size());
+    if (static_cast<int>(values.size()) !=
+        decoder.GetBatch(values_read.data(), values.size())) {
+      return false;
+    }
+    if (values != values_read) { return false; }
   }
+
   return true;
 }
 


Mime
View raw message