impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [1/4] impala git commit: IMPALA-3804: Re-enable per-scan filtering for sequence-based scanners
Date Thu, 07 Dec 2017 17:49:50 GMT
Repository: impala
Updated Branches:
  refs/heads/master ae6ecf51e -> a94d6068c


IMPALA-3804: Re-enable per-scan filtering for sequence-based scanners

IMPALA-3798 disabled per-scan filtering for sequence-
based scanners due to a race between runtime filter
arrival and header splits processing.

This commit enables per-scan filtering again for the
sequence based files. In HdfsScanNode::ProcessSplit()
we check if the current range is the header of a
sequence file. If so, and the filters reject the file,
the whole file skipped.

If it is not a sequence header, but the filters reject
the partition, we call RangeComplete() on the current
scan range.

Change-Id: I4b38c26bcbe67f83efcc65a1723d766626ae3d3e
Reviewed-on: http://gerrit.cloudera.org:8080/8684
Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/d52fa75c
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/d52fa75c
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/d52fa75c

Branch: refs/heads/master
Commit: d52fa75cb9d7c1fae825390a6d4e7b14dd1c61fd
Parents: ae6ecf5
Author: Zoltan Borok-Nagy <boroknagyz@cloudera.com>
Authored: Wed Nov 29 14:39:22 2017 +0100
Committer: Impala Public Jenkins <impala-public-jenkins@gerrit.cloudera.org>
Committed: Thu Dec 7 07:13:29 2017 +0000

----------------------------------------------------------------------
 be/src/exec/base-sequence-scanner.cc            |  5 +++-
 be/src/exec/hdfs-scan-node-base.cc              | 25 ++++++++++++++------
 be/src/exec/hdfs-scan-node-base.h               | 13 ++++++++++
 be/src/exec/hdfs-scan-node.cc                   | 25 ++++++++++----------
 be/src/exec/hdfs-scanner.cc                     |  4 ++--
 .../custom_cluster/test_always_false_filter.py  | 12 ++--------
 6 files changed, 51 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/d52fa75c/be/src/exec/base-sequence-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/base-sequence-scanner.cc b/be/src/exec/base-sequence-scanner.cc
index 7f20e31..0475ac3 100644
--- a/be/src/exec/base-sequence-scanner.cc
+++ b/be/src/exec/base-sequence-scanner.cc
@@ -53,13 +53,16 @@ Status BaseSequenceScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
   for (int i = 0; i < files.size(); ++i) {
     ScanRangeMetadata* metadata =
         static_cast<ScanRangeMetadata*>(files[i]->splits[0]->meta_data());
+    ScanRangeMetadata* header_metadata =
+        scan_node->runtime_state()->obj_pool()->Add(new ScanRangeMetadata(*metadata));
+    header_metadata->is_sequence_header = true;
     int64_t header_size = min<int64_t>(HEADER_SIZE, files[i]->file_length);
     // The header is almost always a remote read. Set the disk id to -1 and indicate
     // it is not cached.
     // TODO: add remote disk id and plumb that through to the io mgr.  It should have
     // 1 queue for each NIC as well?
     ScanRange* header_range = scan_node->AllocateScanRange(files[i]->fs,
-        files[i]->filename.c_str(), header_size, 0, metadata->partition_id, -1, false,
+        files[i]->filename.c_str(), header_size, 0, header_metadata, -1, false,
         BufferOpts::Uncached());
     header_ranges.push_back(header_range);
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/d52fa75c/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index 62dbd6a..2f3d0c5 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -478,10 +478,7 @@ bool HdfsScanNodeBase::FilePassesFilterPredicates(
       static_cast<ScanRangeMetadata*>(file->splits[0]->meta_data());
   if (!PartitionPassesFilters(metadata->partition_id, FilterStats::FILES_KEY,
           filter_ctxs)) {
-    for (int j = 0; j < file->splits.size(); ++j) {
-      // Mark range as complete to ensure progress.
-      RangeComplete(format, file->file_compression, true);
-    }
+    SkipFile(format, file);
     return false;
   }
   return true;
@@ -491,6 +488,15 @@ ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char*
file,
     int64_t len, int64_t offset, int64_t partition_id, int disk_id, bool expected_local,
     const BufferOpts& buffer_opts,
     const ScanRange* original_split) {
+  ScanRangeMetadata* metadata = runtime_state_->obj_pool()->Add(
+        new ScanRangeMetadata(partition_id, original_split));
+  return AllocateScanRange(fs, file, len, offset, metadata, disk_id, expected_local,
+      buffer_opts);
+}
+
+ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file,
+    int64_t len, int64_t offset, ScanRangeMetadata* metadata, int disk_id, bool expected_local,
+    const BufferOpts& buffer_opts) {
   DCHECK_GE(disk_id, -1);
   // Require that the scan range is within [0, file_length). While this cannot be used
   // to guarantee safety (file_length metadata may be stale), it avoids different
@@ -498,12 +504,10 @@ ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char*
file,
   // beyond the end of the file).
   DCHECK_GE(offset, 0);
   DCHECK_GE(len, 0);
-  DCHECK_LE(offset + len, GetFileDesc(partition_id, file)->file_length)
+  DCHECK_LE(offset + len, GetFileDesc(metadata->partition_id, file)->file_length)
       << "Scan range beyond end of file (offset=" << offset << ", len="
<< len << ")";
   disk_id = runtime_state_->io_mgr()->AssignQueue(file, disk_id, expected_local);
 
-  ScanRangeMetadata* metadata = runtime_state_->obj_pool()->Add(
-        new ScanRangeMetadata(partition_id, original_split));
   ScanRange* range = runtime_state_->obj_pool()->Add(new ScanRange);
   range->Reset(fs, file, len, offset, disk_id, expected_local, buffer_opts, metadata);
   return range;
@@ -689,6 +693,13 @@ void HdfsScanNodeBase::RangeComplete(const THdfsFileFormat::type&
file_type,
   ++file_type_counts_[std::make_tuple(file_type, skipped, compression_set)];
 }
 
+void HdfsScanNodeBase::SkipFile(const THdfsFileFormat::type& file_type,
+    HdfsFileDesc* file) {
+  for (int i = 0; i < file->splits.size(); ++i) {
+    RangeComplete(file_type, file->file_compression, true);
+  }
+}
+
 void HdfsScanNodeBase::ComputeSlotMaterializationOrder(vector<int>* order) const {
   const vector<ScalarExpr*>& conjuncts = ExecNode::conjuncts();
   // Initialize all order to be conjuncts.size() (after the last conjunct)

http://git-wip-us.apache.org/repos/asf/impala/blob/d52fa75c/be/src/exec/hdfs-scan-node-base.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index 923b50a..2b310af 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -86,6 +86,10 @@ struct ScanRangeMetadata {
   /// for the scanner to process.
   const io::ScanRange* original_split;
 
+  /// True, if this object belongs to a scan range which is the header of a
+  /// sequence-based file
+  bool is_sequence_header = false;
+
   ScanRangeMetadata(int64_t partition_id, const io::ScanRange* original_split)
       : partition_id(partition_id), original_split(original_split) { }
 };
@@ -209,6 +213,12 @@ class HdfsScanNodeBase : public ScanNode {
       const io::BufferOpts& buffer_opts,
       const io::ScanRange* original_split = NULL);
 
+  /// Same as above, but it takes a pointer to a ScanRangeMetadata object which contains
+  /// the partition_id, original_splits, and other information about the scan range.
+  io::ScanRange* AllocateScanRange(hdfsFS fs, const char* file, int64_t len,
+      int64_t offset, ScanRangeMetadata* metadata, int disk_id, bool expected_local,
+      const io::BufferOpts& buffer_opts);
+
   /// Old API for compatibility with text scanners (e.g. LZO text scanner).
   io::ScanRange* AllocateScanRange(hdfsFS fs, const char* file, int64_t len,
       int64_t offset, int64_t partition_id, int disk_id, bool try_cache,
@@ -262,6 +272,9 @@ class HdfsScanNodeBase : public ScanNode {
   virtual void RangeComplete(const THdfsFileFormat::type& file_type,
       const std::vector<THdfsCompression::type>& compression_type, bool skipped
= false);
 
+  /// Calls RangeComplete() with skipped=true for all the splits of the file
+  void SkipFile(const THdfsFileFormat::type& file_type, HdfsFileDesc* file);
+
   /// Utility function to compute the order in which to materialize slots to allow for
   /// computing conjuncts as slots get materialized (on partial tuples).
   /// 'order' will contain for each slot, the first conjunct it is associated with.

http://git-wip-us.apache.org/repos/asf/impala/blob/d52fa75c/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 2d58c05..710a8af 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -491,20 +491,19 @@ Status HdfsScanNode::ProcessSplit(const vector<FilterContext>&
filter_ctxs,
                             << " partition_id=" << partition_id
                             << "\n" << PrintThrift(runtime_state_->instance_ctx());
 
-  // IMPALA-3798: Filtering before the scanner is created can cause hangs if a header
-  // split is filtered out, for sequence-based file formats. If the scanner does not
-  // process the header split, the remaining scan ranges in the file will not be marked as
-  // done. See FilePassesFilterPredicates() for the correct logic to mark all splits in a
-  // file as done; the correct fix here is to do that for every file in a thread-safe way.
-  if (!BaseSequenceScanner::FileFormatIsSequenceBased(partition->file_format())) {
-    if (!PartitionPassesFilters(partition_id, FilterStats::SPLITS_KEY, filter_ctxs)) {
-      // Avoid leaking unread buffers in scan_range.
-      scan_range->Cancel(Status::CANCELLED);
-      // Mark scan range as done.
-      scan_ranges_complete_counter()->Add(1);
-      progress_.Update(1);
-      return Status::OK();
+  if (!PartitionPassesFilters(partition_id, FilterStats::SPLITS_KEY, filter_ctxs)) {
+    // Avoid leaking unread buffers in scan_range.
+    scan_range->Cancel(Status::CANCELLED);
+    HdfsFileDesc* desc = GetFileDesc(partition_id, *scan_range->file_string());
+    if (metadata->is_sequence_header) {
+      // File ranges haven't been issued yet, skip entire file
+      SkipFile(partition->file_format(), desc);
+    } else {
+      // Mark this scan range as done.
+      HdfsScanNodeBase::RangeComplete(partition->file_format(), desc->file_compression,
+          true);
     }
+    return Status::OK();
   }
 
   ScannerContext context(

http://git-wip-us.apache.org/repos/asf/impala/blob/d52fa75c/be/src/exec/hdfs-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index f53b7d0..2b1a3b4 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -107,8 +107,8 @@ Status HdfsScanner::ProcessSplit() {
   DCHECK(scan_node_->HasRowBatchQueue());
   HdfsScanNode* scan_node = static_cast<HdfsScanNode*>(scan_node_);
   do {
-    // IMPALA-3798: Split-level runtime filtering is disabled with sequence-based file
-    // formats.
+    // IMPALA-3798, IMPALA-3804: For sequence-based files, the filters are only
+    // applied in HdfsScanNode::ProcessSplit()
     bool is_sequence_based = BaseSequenceScanner::FileFormatIsSequenceBased(
         context_->partition_descriptor()->file_format());
     if (!is_sequence_based && FilterContext::CheckForAlwaysFalse(FilterStats::SPLITS_KEY,

http://git-wip-us.apache.org/repos/asf/impala/blob/d52fa75c/tests/custom_cluster/test_always_false_filter.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_always_false_filter.py b/tests/custom_cluster/test_always_false_filter.py
index e02c64a..e64a8c2 100644
--- a/tests/custom_cluster/test_always_false_filter.py
+++ b/tests/custom_cluster/test_always_false_filter.py
@@ -31,7 +31,7 @@ class TestAlwaysFalseFilter(CustomClusterTestSuite):
   @CustomClusterTestSuite.with_args("--skip_file_runtime_filtering=true")
   def test_skip_split(self, cursor):
     """IMPALA-5789: Test that always false filter filters out splits when file-level
-    filtering is disabled. The filtering is not enabled in seq-based file formats."""
+    filtering is disabled."""
     cursor.execute("SET RUNTIME_FILTER_MODE=GLOBAL")
     cursor.execute("SET RUNTIME_FILTER_WAIT_TIME_MS=10000")
     query = """select STRAIGHT_JOIN * from alltypes inner join
@@ -39,15 +39,7 @@ class TestAlwaysFalseFilter(CustomClusterTestSuite):
             on v.year = alltypes.year"""
     # Manually iterate through file formats instead of creating a test matrix to prevent
     # the cluster from restarting multiple times.
-    for table_suffix in ['_avro', '_rc', '_seq']:
-      cursor.execute("use functional" + table_suffix)
-      cursor.execute(query)
-      # Fetch all rows to finalize the query profile.
-      cursor.fetchall()
-      profile = cursor.get_profile()
-      assert re.search("Files rejected: [^0] \([^0]\)", profile) is None
-      assert re.search("Splits rejected: [^0] \([^0]\)", profile) is None
-    for table_suffix in ['', '_parquet']:
+    for table_suffix in ['', '_avro', '_parquet', '_rc', '_seq']:
       cursor.execute("use functional" + table_suffix)
       cursor.execute(query)
       # Fetch all rows to finalize the query profile.


Mime
View raw message