impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jruss...@apache.org
Subject [1/2] impala git commit: IMPALA-6419: Revert "IMPALA-6383: free memory after skipping parquet row groups"
Date Thu, 18 Jan 2018 21:57:58 GMT
Repository: impala
Updated Branches:
  refs/heads/master 35a3e186d -> ca7d03cfe


IMPALA-6419: Revert "IMPALA-6383: free memory after skipping parquet row groups"

This reverts commit 10fb24afb966c567adcf632a314f6af1826f19fc.

Change-Id: I4dd62380d02b61ca46f856b4eb40670b71e28140
Reviewed-on: http://gerrit.cloudera.org:8080/9054
Reviewed-by: Alex Behm <alex.behm@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/f5d73f5e
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/f5d73f5e
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/f5d73f5e

Branch: refs/heads/master
Commit: f5d73f5e76d477ae47e02df4fb69ad590363c0d6
Parents: 35a3e18
Author: Tim Armstrong <tarmstrong@cloudera.com>
Authored: Thu Jan 18 09:43:30 2018 -0800
Committer: Impala Public Jenkins <impala-public-jenkins@gerrit.cloudera.org>
Committed: Thu Jan 18 21:25:28 2018 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-parquet-scanner.cc | 26 +++++---------------------
 be/src/exec/hdfs-parquet-scanner.h  |  5 -----
 be/src/exec/scanner-context.h       |  8 ++++----
 3 files changed, 9 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/f5d73f5e/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index 6380722..f0f280d 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -228,7 +228,6 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
   // Release I/O buffers immediately to make sure they are cleaned up
   // in case we return a non-OK status anywhere below.
   context_->ReleaseCompletedResources(true);
-  context_->ClearStreams();
   RETURN_IF_ERROR(footer_status);
 
   // Parse the file schema into an internal representation for schema resolution.
@@ -264,7 +263,7 @@ void HdfsParquetScanner::Close(RowBatch* row_batch) {
     }
   } else {
     template_tuple_pool_->FreeAll();
-    dictionary_pool_->FreeAll();
+    dictionary_pool_.get()->FreeAll();
     context_->ReleaseCompletedResources(true);
     for (ParquetColumnReader* col_reader : column_readers_) col_reader->Close(nullptr);
     // The scratch batch may still contain tuple data. We can get into this case if
@@ -479,6 +478,7 @@ Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch) {
     // Transfer resources and clear streams if there is any leftover from the previous
     // row group. We will create new streams for the next row group.
     FlushRowGroupResources(row_batch);
+    context_->ClearStreams();
     if (!advance_row_group_) {
       Status status =
           ValidateEndOfRowGroup(column_readers_, row_group_idx_, row_group_rows_read_);
@@ -619,9 +619,6 @@ Status HdfsParquetScanner::NextRowGroup() {
   while (true) {
     // Reset the parse status for the next row group.
     parse_status_ = Status::OK();
-    // Make sure that we don't have leftover resources from the file metadata scan range
-    // or previous row groups.
-    DCHECK_EQ(0, context_->NumStreams());
 
     ++row_group_idx_;
     if (row_group_idx_ >= file_metadata_.row_groups.size()) {
@@ -672,9 +669,6 @@ Status HdfsParquetScanner::NextRowGroup() {
     // of the column.
     RETURN_IF_ERROR(InitColumns(row_group_idx_, dict_filterable_readers_));
 
-    // InitColumns() may have allocated resources to scan columns. If we skip this row
-    // group below, we must call ReleaseSkippedRowGroupResources() before continuing.
-
     // If there is a dictionary-encoded column where every value is eliminated
     // by a conjunct, the row group can be eliminated. This initializes dictionaries
     // for all columns visited.
@@ -683,12 +677,10 @@ Status HdfsParquetScanner::NextRowGroup() {
     if (!status.ok()) {
       // Either return an error or skip this row group if it is ok to ignore errors
       RETURN_IF_ERROR(state_->LogOrReturnError(status.msg()));
-      ReleaseSkippedRowGroupResources();
       continue;
     }
     if (skip_row_group_on_dict_filters) {
       COUNTER_ADD(num_dict_filtered_row_groups_counter_, 1);
-      ReleaseSkippedRowGroupResources();
       continue;
     }
 
@@ -700,7 +692,6 @@ Status HdfsParquetScanner::NextRowGroup() {
     if (!status.ok()) {
       // Either return an error or skip this row group if it is ok to ignore errors
       RETURN_IF_ERROR(state_->LogOrReturnError(status.msg()));
-      ReleaseSkippedRowGroupResources();
       continue;
     }
 
@@ -739,16 +730,9 @@ void HdfsParquetScanner::FlushRowGroupResources(RowBatch* row_batch)
{
   row_batch->tuple_data_pool()->AcquireData(dictionary_pool_.get(), false);
   scratch_batch_->ReleaseResources(row_batch->tuple_data_pool());
   context_->ReleaseCompletedResources(true);
-  for (ParquetColumnReader* col_reader : column_readers_) col_reader->Close(row_batch);
-  context_->ClearStreams();
-}
-
-void HdfsParquetScanner::ReleaseSkippedRowGroupResources() {
-  dictionary_pool_->FreeAll();
-  scratch_batch_->ReleaseResources(nullptr);
-  context_->ReleaseCompletedResources(true);
-  for (ParquetColumnReader* col_reader : column_readers_) col_reader->Close(nullptr);
-  context_->ClearStreams();
+  for (ParquetColumnReader* col_reader : column_readers_) {
+    col_reader->Close(row_batch);
+  }
 }
 
 bool HdfsParquetScanner::IsDictFilterable(ParquetColumnReader* col_reader) {

http://git-wip-us.apache.org/repos/asf/impala/blob/f5d73f5e/be/src/exec/hdfs-parquet-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.h b/be/src/exec/hdfs-parquet-scanner.h
index cea06ed..99b5a60 100644
--- a/be/src/exec/hdfs-parquet-scanner.h
+++ b/be/src/exec/hdfs-parquet-scanner.h
@@ -642,11 +642,6 @@ class HdfsParquetScanner : public HdfsScanner {
   /// Should be called after completing a row group and when returning the last batch.
   void FlushRowGroupResources(RowBatch* row_batch);
 
-  /// Releases resources associated with a row group that was skipped and closes all
-  /// column readers. Should be called after skipping a row group from which no rows
-  /// were returned.
-  void ReleaseSkippedRowGroupResources();
-
   /// Evaluates whether the column reader is eligible for dictionary predicates
   bool IsDictFilterable(ParquetColumnReader* col_reader);
 

http://git-wip-us.apache.org/repos/asf/impala/blob/f5d73f5e/be/src/exec/scanner-context.h
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.h b/be/src/exec/scanner-context.h
index 09a4bdc..e316063 100644
--- a/be/src/exec/scanner-context.h
+++ b/be/src/exec/scanner-context.h
@@ -89,6 +89,7 @@ class ScannerContext {
   ScannerContext(RuntimeState*, HdfsScanNodeBase*, HdfsPartitionDescriptor*,
       io::ScanRange* scan_range, const std::vector<FilterContext>& filter_ctxs,
       MemPool* expr_results_pool);
+
   /// Destructor verifies that all stream objects have been released.
   ~ScannerContext();
 
@@ -337,8 +338,6 @@ class ScannerContext {
     return streams_[idx].get();
   }
 
-  int NumStreams() const { return streams_.size(); }
-
   /// Release completed resources for all streams, e.g. the last buffer in each stream if
   /// the current read position is at the end of the buffer. If 'done' is true all
   /// resources are freed, even if the caller has not read that data yet. After calling
@@ -355,8 +354,8 @@ class ScannerContext {
   /// size to 0.
   void ClearStreams();
 
-  /// Add a stream to this ScannerContext for 'range'. The stream is owned by this
-  /// context.
+  /// Add a stream to this ScannerContext for 'range'. Returns the added stream.
+  /// The stream is created in the runtime state's object pool
   Stream* AddStream(io::ScanRange* range);
 
   /// Returns false if scan_node_ is multi-threaded and has been cancelled.
@@ -371,6 +370,7 @@ class ScannerContext {
 
   RuntimeState* state_;
   HdfsScanNodeBase* scan_node_;
+
   HdfsPartitionDescriptor* partition_desc_;
 
   /// Vector of streams. Non-columnar formats will always have one stream per context.


Mime
View raw message