impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mi...@apache.org
Subject [1/3] impala git commit: IMPALA-5522:Use tracked memory for DictDecoder and DictEncoder
Date Mon, 08 Jan 2018 17:04:02 GMT
Repository: impala
Updated Branches:
  refs/heads/master d25607d01 -> 6a87eb20a


IMPALA-5522:Use tracked memory for DictDecoder and DictEncoder

Currently DictDecoder class and DictEncoder class uses std::vector
to store the tables mapping codeword to value and vice-versa. It is
hard to detect the memory usage by these tables when they becomes
very large, since this memory is not accounted by Impala's memory
mangement infrastructure.

This patch uses the memory tracker of HdfsScanner to track the memory used
by dictionary in DictDecoder class. Similary it uses memory tracker of
HdfsTableSink to track the memory used by dictionary in DictEncoder class.

Memory for the dictionary, stored as std::vector is still allocated
from std:allocator but the amount allocated is accounted by
introducing a counter which is incremented and decremented as the
memory is consumed and released by vector.

Testing
-------
Ran all the backend and end-end tests with no failures.

Change-Id: I02a3b54f6c107d19b62ad9e1c49df94175964299
Reviewed-on: http://gerrit.cloudera.org:8080/8034
Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/302ec25b
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/302ec25b
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/302ec25b

Branch: refs/heads/master
Commit: 302ec25b2e76398f11bb5dccbdad84e06c750f22
Parents: d25607d
Author: Pranay <psingh@cloudera.com>
Authored: Mon Sep 11 14:38:49 2017 -0700
Committer: Impala Public Jenkins <impala-public-jenkins@gerrit.cloudera.org>
Committed: Sat Jan 6 01:30:36 2018 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-parquet-table-writer.cc |   5 +-
 be/src/exec/parquet-column-readers.cc    |   1 +
 be/src/exec/parquet-column-readers.h     |   2 +
 be/src/util/dict-encoding.h              | 138 ++++++++++++++++++++++++--
 be/src/util/dict-test.cc                 |  51 ++++++++--
 5 files changed, 173 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/302ec25b/be/src/exec/hdfs-parquet-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-table-writer.cc b/be/src/exec/hdfs-parquet-table-writer.cc
index 8c9d92e..952c78e 100644
--- a/be/src/exec/hdfs-parquet-table-writer.cc
+++ b/be/src/exec/hdfs-parquet-table-writer.cc
@@ -175,7 +175,7 @@ class HdfsParquetTableWriter::BaseColumnWriter {
   // be added.
   void Close() {
     if (compressor_.get() != nullptr) compressor_->Close();
-    if (dict_encoder_base_ != nullptr) dict_encoder_base_->ClearIndices();
+    if (dict_encoder_base_ != nullptr) dict_encoder_base_->Close();
   }
 
   const ColumnType& type() const { return expr_eval_->root().type(); }
@@ -315,7 +315,8 @@ class HdfsParquetTableWriter::ColumnWriter :
     current_encoding_ = Encoding::PLAIN_DICTIONARY;
     next_page_encoding_ = Encoding::PLAIN_DICTIONARY;
     dict_encoder_.reset(
-        new DictEncoder<T>(parent_->per_file_mem_pool_.get(), plain_encoded_value_size_));
+        new DictEncoder<T>(parent_->per_file_mem_pool_.get(), plain_encoded_value_size_,
+            parent_->parent_->mem_tracker()));
     dict_encoder_base_ = dict_encoder_.get();
     page_stats_.reset(
         new ColumnStats<T>(parent_->per_file_mem_pool_.get(), plain_encoded_value_size_));

http://git-wip-us.apache.org/repos/asf/impala/blob/302ec25b/be/src/exec/parquet-column-readers.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.cc b/be/src/exec/parquet-column-readers.cc
index 02d2950..1c1cf39 100644
--- a/be/src/exec/parquet-column-readers.cc
+++ b/be/src/exec/parquet-column-readers.cc
@@ -236,6 +236,7 @@ class ScalarColumnReader : public BaseScalarColumnReader {
   ScalarColumnReader(HdfsParquetScanner* parent, const SchemaNode& node,
       const SlotDescriptor* slot_desc)
     : BaseScalarColumnReader(parent, node, slot_desc),
+      dict_decoder_(parent->scan_node_->mem_tracker()),
       dict_decoder_init_(false),
       needs_conversion_(false) {
     if (!MATERIALIZED) {

http://git-wip-us.apache.org/repos/asf/impala/blob/302ec25b/be/src/exec/parquet-column-readers.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.h b/be/src/exec/parquet-column-readers.h
index 17b8c0f..3a8ad70 100644
--- a/be/src/exec/parquet-column-readers.h
+++ b/be/src/exec/parquet-column-readers.h
@@ -372,6 +372,8 @@ class BaseScalarColumnReader : public ParquetColumnReader {
       data_page_pool_->FreeAll();
     }
     if (decompressor_ != nullptr) decompressor_->Close();
+    DictDecoderBase* dict_decoder = GetDictionaryDecoder();
+    if (dict_decoder != nullptr) dict_decoder->Close();
   }
 
   int64_t total_len() const { return metadata_->total_compressed_size; }

http://git-wip-us.apache.org/repos/asf/impala/blob/302ec25b/be/src/util/dict-encoding.h
----------------------------------------------------------------------
diff --git a/be/src/util/dict-encoding.h b/be/src/util/dict-encoding.h
index fa5e798..d916bab 100644
--- a/be/src/util/dict-encoding.h
+++ b/be/src/util/dict-encoding.h
@@ -52,6 +52,15 @@ class DictEncoderBase {
  public:
   virtual ~DictEncoderBase() {
     DCHECK(buffered_indices_.empty());
+    ReleaseBytes();
+    DCHECK_EQ(dict_bytes_cnt_, 0);
+  }
+
+  /// This function will clear the buffered_indices and
+  /// decrement the bytes used by dictionary.
+  void Close() {
+    ClearIndices();
+    ReleaseBytes();
   }
 
   /// Writes out the encoded dictionary to buffer. buffer must be preallocated to
@@ -86,26 +95,77 @@ class DictEncoderBase {
 
   int dict_encoded_size() { return dict_encoded_size_; }
 
+  void UsedbyTest() { used_by_test_ = true;}
+
  protected:
-  DictEncoderBase(MemPool* pool)
-    : dict_encoded_size_(0), pool_(pool) {
-  }
+  DictEncoderBase(MemPool* pool, MemTracker* mem_tracker) :
+      dict_encoded_size_(0),
+      pool_(pool),
+      dict_bytes_cnt_(0),
+      dict_mem_tracker_(mem_tracker) { }
 
   /// Indices that have not yet be written out by WriteData().
   std::vector<int> buffered_indices_;
 
+  /// Memtracker Consume is called every ENC_MEM_TRACK_CNT times.
+  /// Periodicity of calling Memtracker Consume.
+  const int ENC_MEM_TRACK_CNT = 8192;
+
+  /// Number of times ConsumeBytes() was called.
+  int num_call_track_{0};
+
   /// The number of bytes needed to encode the dictionary.
-  int dict_encoded_size_;
+  int dict_encoded_size_{0};
 
   /// Pool to store StringValue data. Not owned.
-  MemPool* pool_;
+  MemPool* pool_{nullptr};
+
+  /// This will account for bytes consumed by nodes_
+  int dict_bytes_cnt_{0};
+
+  /// This will account for bytes consumed, last_accounted by memtracker
+  int dict_bytes_cnt_memtrack_{0};
+
+  /// This will track the memory used by nodes_
+  MemTracker* dict_mem_tracker_{nullptr};
+
+  /// Function to decrement the byte counter and decrease the bytes usage
+  /// of the memory tracker.
+  void ReleaseBytes() {
+    if (dict_mem_tracker_ != nullptr) {
+      dict_mem_tracker_->Release(dict_bytes_cnt_memtrack_);
+      dict_bytes_cnt_ = 0;
+      dict_bytes_cnt_memtrack_ = 0;
+      num_call_track_ = 0;
+    }
+  }
+
+  /// Used by dict-test.cc to check the usage of bytes by dictionary
+  /// is properly accounted.
+  bool used_by_test_{false};
+
+  /// Function to increment the byte counter and increase the bytes usage
+  /// of the memory tracker.
+  void ConsumeBytes(int num_bytes) {
+    if (dict_mem_tracker_ != nullptr) {
+      dict_bytes_cnt_ += num_bytes;
+      // Calling Memtracker frequently may be expensive so update
+      // the memtracker every ENC_MEM_TRACK_CNT times.
+      if (num_call_track_ % ENC_MEM_TRACK_CNT == 0 || used_by_test_ == true) {
+        // TODO: TryConsume() can be called to check if memory limit has been exceeded.
+        dict_mem_tracker_->Consume(dict_bytes_cnt_ - dict_bytes_cnt_memtrack_);
+        dict_bytes_cnt_memtrack_ = dict_bytes_cnt_;
+      }
+      num_call_track_++;
+    }
+  }
 };
 
 template<typename T>
 class DictEncoder : public DictEncoderBase {
  public:
-  DictEncoder(MemPool* pool, int encoded_value_size) :
-      DictEncoderBase(pool), buckets_(HASH_TABLE_SIZE, Node::INVALID_INDEX),
+  DictEncoder(MemPool* pool, int encoded_value_size, MemTracker* mem_tracker) :
+      DictEncoderBase(pool, mem_tracker), buckets_(HASH_TABLE_SIZE, Node::INVALID_INDEX),
       encoded_value_size_(encoded_value_size) { }
 
   /// Encode value. Returns the number of bytes added to the dictionary page length
@@ -115,6 +175,13 @@ class DictEncoder : public DictEncoderBase {
   /// written later.
   int Put(const T& value);
 
+  /// This function returns the size in bytes of the dictionary vector.
+  /// It is used by dict-test.cc for validation of bytes consumed against
+  /// memory tracked.
+  int DictByteSize() {
+    return sizeof(Node) * nodes_.size();
+  }
+
   virtual void WriteDict(uint8_t* buffer);
 
   virtual int num_entries() const { return nodes_.size(); }
@@ -168,6 +235,9 @@ class DictEncoder : public DictEncoderBase {
 /// by the caller and valid as long as this object is.
 class DictDecoderBase {
  public:
+   DictDecoderBase(MemTracker* tracker) :
+     dict_bytes_cnt_(0), dict_mem_tracker_(tracker) { }
+
   /// The rle encoded indices into the dictionary. Returns an error status if the buffer
   /// is too short or the bit_width metadata in the buffer is invalid.
   Status SetData(uint8_t* buffer, int buffer_len) {
@@ -187,7 +257,10 @@ class DictDecoderBase {
     return Status::OK();
   }
 
-  virtual ~DictDecoderBase() {}
+  virtual ~DictDecoderBase() {
+    ReleaseBytes();
+    DCHECK_EQ(dict_bytes_cnt_, 0);
+  }
 
   virtual int num_entries() const = 0;
 
@@ -195,6 +268,11 @@ class DictDecoderBase {
   /// The buffer must be large enough to receive the datatype for this dictionary.
   virtual void GetValue(int index, void* buffer) = 0;
 
+  /// This function will decrement the bytes used by dictionary, MemTracker
+  void Close() {
+    ReleaseBytes();
+  }
+
  protected:
   /// Number of decoded values to buffer at a time. A multiple of 32 is chosen to allow
   /// efficient reading in batches from data_decoder_. Increasing the batch size up to
@@ -212,13 +290,37 @@ class DictDecoderBase {
 
   /// The index of the next decoded value to return.
   int next_literal_idx_ = 0;
+
+  /// This will account for bytes consumed by dict_
+  int dict_bytes_cnt_{0};
+
+  /// This will track the memory used by dict_
+  MemTracker* dict_mem_tracker_{nullptr};
+
+  /// Function to decrement the byte counter and decrease the bytes usage
+  /// of the memory tracker.
+  void ReleaseBytes() {
+    if (dict_mem_tracker_ != nullptr) {
+      dict_mem_tracker_->Release(dict_bytes_cnt_);
+      dict_bytes_cnt_ = 0;
+    }
+  }
+
+  /// Function to increment the byte counter and increase the bytes usage
+  /// of the memory tracker.
+  void ConsumeBytes(int num_bytes) {
+    if (dict_mem_tracker_ != nullptr) {
+      dict_bytes_cnt_ += num_bytes;
+      dict_mem_tracker_->Consume(num_bytes);
+    }
+  }
 };
 
 template<typename T>
 class DictDecoder : public DictDecoderBase {
  public:
   /// Construct empty dictionary.
-  DictDecoder() {}
+  DictDecoder(MemTracker* tracker):DictDecoderBase(tracker) {}
 
   /// Initialize the decoder with an input buffer containing the dictionary.
   /// 'dict_len' is the byte length of dict_buffer.
@@ -245,7 +347,15 @@ class DictDecoder : public DictDecoderBase {
   /// the string data is from the dictionary buffer passed into the c'tor.
   bool GetNextValue(T* value) WARN_UNUSED_RESULT;
 
+  /// This function returns the size in bytes of the dictionary vector.
+  /// It is used by dict-test.cc for validation of bytes consumed against
+  /// memory tracked.
+  int DictByteSize() {
+    return sizeof(T) * dict_.size();
+  }
+
  private:
+  /// List of decoded values stored in the dict_
   std::vector<T> dict_;
 
   /// Decoded values, buffered to allow caller to consume one-by-one. If in the middle of
@@ -293,8 +403,10 @@ inline uint32_t DictEncoder<StringValue>::Hash(const StringValue&
value) const {
 template<typename T>
 inline int DictEncoder<T>::AddToTable(const T& value, NodeIndex* bucket) {
   DCHECK_GT(encoded_value_size_, 0);
+  Node node(value, *bucket);
+  ConsumeBytes(sizeof(node));
   // Prepend the new node to this bucket's chain.
-  nodes_.push_back(Node(value, *bucket));
+  nodes_.push_back(node);
   *bucket = nodes_.size() - 1;
   dict_encoded_size_ += encoded_value_size_;
   return encoded_value_size_;
@@ -306,8 +418,10 @@ inline int DictEncoder<StringValue>::AddToTable(const StringValue&
value,
   char* ptr_copy = reinterpret_cast<char*>(pool_->Allocate(value.len));
   memcpy(ptr_copy, value.ptr, value.len);
   StringValue sv(ptr_copy, value.len);
+  Node node(sv, *bucket);
+  ConsumeBytes(sizeof(node));
   // Prepend the new node to this bucket's chain.
-  nodes_.push_back(Node(sv, *bucket));
+  nodes_.push_back(node);
   *bucket = nodes_.size() - 1;
   int bytes_added = ParquetPlainEncoder::ByteSize(sv);
   dict_encoded_size_ += bytes_added;
@@ -386,6 +500,7 @@ template<parquet::Type::type PARQUET_TYPE>
 inline bool DictDecoder<T>::Reset(uint8_t* dict_buffer, int dict_len,
     int fixed_len_size) {
   dict_.clear();
+  ReleaseBytes();
   uint8_t* end = dict_buffer + dict_len;
   while (dict_buffer < end) {
     T value;
@@ -395,6 +510,7 @@ inline bool DictDecoder<T>::Reset(uint8_t* dict_buffer, int dict_len,
     dict_buffer += decoded_len;
     dict_.push_back(value);
   }
+  ConsumeBytes(sizeof(T) * dict_.size());
   return true;
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/302ec25b/be/src/util/dict-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/dict-test.cc b/be/src/util/dict-test.cc
index 11043f0..cf438f6 100644
--- a/be/src/util/dict-test.cc
+++ b/be/src/util/dict-test.cc
@@ -36,10 +36,15 @@ void ValidateDict(const vector<InternalType>& values,
     const vector<InternalType>& dict_values, int fixed_buffer_byte_size) {
   set<InternalType> values_set(values.begin(), values.end());
 
+  int bytes_alloc = 0;
+  MemTracker track_encoder;
   MemTracker tracker;
   MemPool pool(&tracker);
-  DictEncoder<InternalType> encoder(&pool, fixed_buffer_byte_size);
+  DictEncoder<InternalType> encoder(&pool, fixed_buffer_byte_size, &track_encoder);
+  encoder.UsedbyTest();
   for (InternalType i: values) encoder.Put(i);
+  bytes_alloc = encoder.DictByteSize();
+  EXPECT_EQ(track_encoder.consumption(), bytes_alloc);
   EXPECT_EQ(encoder.num_entries(), values_set.size());
 
   uint8_t dict_buffer[encoder.dict_encoded_size()];
@@ -51,9 +56,12 @@ void ValidateDict(const vector<InternalType>& values,
   EXPECT_GT(data_len, 0);
   encoder.ClearIndices();
 
-  DictDecoder<InternalType> decoder;
+  MemTracker decode_tracker;
+  DictDecoder<InternalType> decoder(&decode_tracker);
   ASSERT_TRUE(decoder.template Reset<PARQUET_TYPE>(dict_buffer,
       encoder.dict_encoded_size(),fixed_buffer_byte_size));
+  bytes_alloc = decoder.DictByteSize();
+  EXPECT_EQ(decode_tracker.consumption(), bytes_alloc);
 
   // Test direct access to the dictionary via indexes
   for (int i = 0; i < dict_values.size(); ++i) {
@@ -180,7 +188,8 @@ TEST(DictTest, TestInvalidStrings) {
 
   // Test a dictionary with a string encoded with negative length. Initializing
   // the decoder should fail.
-  DictDecoder<StringValue> decoder;
+  MemTracker tracker;
+  DictDecoder<StringValue> decoder(&tracker);
   ASSERT_FALSE(decoder.template Reset<parquet::Type::BYTE_ARRAY>(buffer, sizeof(buffer),
       0));
 }
@@ -193,7 +202,8 @@ TEST(DictTest, TestStringBufferOverrun) {
 
   // Initializing the dictionary should fail, since the string would reference
   // invalid memory.
-  DictDecoder<StringValue> decoder;
+  MemTracker tracker;
+  DictDecoder<StringValue> decoder(&tracker);
   ASSERT_FALSE(decoder.template Reset<parquet::Type::BYTE_ARRAY>(buffer, sizeof(buffer),
       0));
 }
@@ -202,9 +212,13 @@ TEST(DictTest, TestStringBufferOverrun) {
 // decoder to a clean state, even if the input is not fully consumed. The RLE decoder
 // has various state that needs to be reset.
 TEST(DictTest, SetDataAfterPartialRead) {
+  int bytes_alloc = 0;
   MemTracker tracker;
+  MemTracker track_encoder;
+  MemTracker track_decoder;
   MemPool pool(&tracker);
-  DictEncoder<int> encoder(&pool, sizeof(int));
+  DictEncoder<int> encoder(&pool, sizeof(int), &track_encoder);
+  encoder.UsedbyTest();
 
   // Literal run followed by a repeated run.
   vector<int> values{1, 2, 3, 4, 5, 6, 7, 8, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9};
@@ -214,10 +228,12 @@ TEST(DictTest, SetDataAfterPartialRead) {
   encoder.WriteDict(dict_buffer.data());
   vector<uint8_t> data_buffer(encoder.EstimatedDataEncodedSize() * 2);
   int data_len = encoder.WriteData(data_buffer.data(), data_buffer.size());
+  bytes_alloc = encoder.DictByteSize();
+  EXPECT_EQ(track_encoder.consumption(), bytes_alloc);
   ASSERT_GT(data_len, 0);
   encoder.ClearIndices();
 
-  DictDecoder<int> decoder;
+  DictDecoder<int> decoder(&track_decoder);
   ASSERT_TRUE(decoder.template Reset<parquet::Type::INT32>(
       dict_buffer.data(), dict_buffer.size(), sizeof(int)));
 
@@ -231,13 +247,19 @@ TEST(DictTest, SetDataAfterPartialRead) {
       EXPECT_EQ(values[i], val) << num_to_decode << " " << i;
     }
   }
+  bytes_alloc = decoder.DictByteSize();
+  EXPECT_EQ(track_decoder.consumption(), bytes_alloc);
 }
 
 // Test handling of decode errors from out-of-range values.
 TEST(DictTest, DecodeErrors) {
+  int bytes_alloc = 0;
   MemTracker tracker;
+  MemTracker track_encoder;
+  MemTracker track_decoder;
   MemPool pool(&tracker);
-  DictEncoder<int> small_dict_encoder(&pool, sizeof(int));
+  DictEncoder<int> small_dict_encoder(&pool, sizeof(int), &track_encoder);
+  small_dict_encoder.UsedbyTest();
 
   // Generate a dictionary with 9 values (requires 4 bits to encode).
   vector<int> small_dict_values{1, 2, 3, 4, 5, 6, 7, 8, 9};
@@ -245,9 +267,11 @@ TEST(DictTest, DecodeErrors) {
 
   vector<uint8_t> small_dict_buffer(small_dict_encoder.dict_encoded_size());
   small_dict_encoder.WriteDict(small_dict_buffer.data());
+  bytes_alloc = small_dict_encoder.DictByteSize();
+  EXPECT_EQ(track_encoder.consumption(), bytes_alloc);
   small_dict_encoder.ClearIndices();
 
-  DictDecoder<int> small_dict_decoder;
+  DictDecoder<int> small_dict_decoder(&track_decoder);
   ASSERT_TRUE(small_dict_decoder.template Reset<parquet::Type::INT32>(
         small_dict_buffer.data(), small_dict_buffer.size(), sizeof(int)));
 
@@ -266,17 +290,20 @@ TEST(DictTest, DecodeErrors) {
   for (TestCase& test_case: test_cases) {
     // Encode the values. This will produce a dictionary with more distinct values than
     // the small dictionary that we'll use to decode it.
-    DictEncoder<int> large_dict_encoder(&pool, sizeof(int));
+    DictEncoder<int> large_dict_encoder(&pool, sizeof(int), &track_encoder);
+    large_dict_encoder.UsedbyTest();
     // Initialize the dictionary with the values already in the small dictionary.
     for (int val : small_dict_values) large_dict_encoder.Put(val);
-    large_dict_encoder.ClearIndices();
+    large_dict_encoder.Close();
 
     for (int val: test_case.second) large_dict_encoder.Put(val);
 
     vector<uint8_t> data_buffer(large_dict_encoder.EstimatedDataEncodedSize() * 2);
     int data_len = large_dict_encoder.WriteData(data_buffer.data(), data_buffer.size());
     ASSERT_GT(data_len, 0);
-    large_dict_encoder.ClearIndices();
+    bytes_alloc = large_dict_encoder.DictByteSize();
+    EXPECT_EQ(track_encoder.consumption(), bytes_alloc);
+    large_dict_encoder.Close();
 
     ASSERT_OK(small_dict_decoder.SetData(data_buffer.data(), data_buffer.size()));
     bool failed = false;
@@ -285,6 +312,8 @@ TEST(DictTest, DecodeErrors) {
       failed = !small_dict_decoder.GetNextValue(&val);
       if (failed) break;
     }
+    bytes_alloc = small_dict_decoder.DictByteSize();
+    EXPECT_EQ(track_decoder.consumption(), bytes_alloc);
     EXPECT_TRUE(failed) << "Should have detected out-of-range dict-encoded value in
test "
         << test_case.first;
   }


Mime
View raw message