impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From he...@apache.org
Subject [1/8] incubator-impala git commit: IMPALA-2328: Address additional comments
Date Sun, 05 Mar 2017 23:37:03 GMT
Repository: incubator-impala
Updated Branches:
  refs/heads/master 157da298d -> 13837262b


IMPALA-2328: Address additional comments

- test_parquet_stats.py was missing and the tests weren't run during
GVO.

- The tests in parquet_stats.test assume that the queries were executed
in a single fragment, so they now run with 'num_nodes = 1'.

- Parquet columns are now resolved correctly.

- Parquet files with missing columns are now handled correctly.

- Predicates with implicit casts can now be evaluated against
parquet::Statistics.

- This change also cleans up some old friend declarations I came across.

Change-Id: I54c205fad7afc4a0b0a7d0f654859de76db29a02
Reviewed-on: http://gerrit.cloudera.org:8080/6147
Reviewed-by: Lars Volker <lv@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/996fb5ea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/996fb5ea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/996fb5ea

Branch: refs/heads/master
Commit: 996fb5eab3e9cefc738afbc8171b901ad116dfb8
Parents: 157da29
Author: Lars Volker <lv@cloudera.com>
Authored: Thu Feb 23 22:26:32 2017 -0800
Committer: Impala Public Jenkins <impala-public-jenkins@gerrit.cloudera.org>
Committed: Fri Mar 3 02:34:10 2017 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-parquet-scanner.cc             | 41 ++++++++++++----
 be/src/exprs/case-expr.h                        |  1 -
 be/src/exprs/expr.h                             |  4 --
 .../org/apache/impala/planner/HdfsScanNode.java |  9 ++--
 .../queries/QueryTest/parquet_stats.test        | 50 ++++++++++++++++++++
 tests/query_test/test_parquet_stats.py          | 41 ++++++++++++++++
 6 files changed, 129 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/996fb5ea/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index 2b0454e..ab40713 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -193,7 +193,7 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
 
   // Allocate tuple buffer to evaluate conjuncts on parquet::Statistics.
   const TupleDescriptor* min_max_tuple_desc = scan_node_->min_max_tuple_desc();
-  if (min_max_tuple_desc) {
+  if (min_max_tuple_desc != nullptr) {
     int64_t tuple_size = min_max_tuple_desc->byte_size();
     if (!min_max_tuple_buffer_.TryAllocate(tuple_size)) {
       return Status(Substitute("Could not allocate buffer of $0 bytes for Parquet "
@@ -484,16 +484,39 @@ Status HdfsParquetScanner::EvaluateStatsConjuncts(const parquet::RowGroup&
row_g
   Tuple* min_max_tuple = reinterpret_cast<Tuple*>(min_max_tuple_buffer_.buffer());
   min_max_tuple->Init(tuple_size);
 
-  DCHECK(min_max_tuple_desc->slots().size() == min_max_conjuncts_ctxs_.size());
+  DCHECK_EQ(min_max_tuple_desc->slots().size(), min_max_conjuncts_ctxs_.size());
 
   min_max_conjuncts_ctxs_to_eval_.clear();
   for (int i = 0; i < min_max_conjuncts_ctxs_.size(); ++i) {
     SlotDescriptor* slot_desc = min_max_tuple_desc->slots()[i];
     ExprContext* conjunct = min_max_conjuncts_ctxs_[i];
-    Expr* e = conjunct->root();
-    DCHECK(e->GetChild(0)->is_slotref());
 
-    int col_idx = slot_desc->col_pos() - scan_node_->num_partition_keys();
+    // Resolve column path to determine col idx.
+    SchemaNode* node = nullptr;
+    bool pos_field;
+    bool missing_field;
+    RETURN_IF_ERROR(schema_resolver_->ResolvePath(slot_desc->col_path(),
+        &node, &pos_field, &missing_field));
+
+    if (missing_field) {
+      // We are selecting a column that is not in the file. We would set its slot to NULL
+      // during the scan, so any predicate would evaluate to false. Return early. NULL
+      // comparisons cannot happen here, since predicates with NULL literals are filtered
+      // in the frontend.
+      *skip_row_group = true;
+      return Status::OK();
+    }
+
+    if (pos_field) {
+      // The planner should not send predicates with 'pos' for stats filtering to the BE.
+      // In case there is a bug, we return an error, which will abort the query.
+      stringstream err;
+      err << "Statistics not supported for pos fields: " << slot_desc->DebugString();
+      DCHECK(false) << err.str();
+      return Status(err.str());
+    }
+
+    int col_idx = node->col_idx;
     DCHECK(col_idx < row_group.columns.size());
 
     if (!ParquetMetadataUtils::HasRowGroupStats(row_group, col_idx)) continue;
@@ -503,17 +526,17 @@ Status HdfsParquetScanner::EvaluateStatsConjuncts(const parquet::RowGroup&
row_g
     void* slot = min_max_tuple->GetSlot(slot_desc->tuple_offset());
     const ColumnType& col_type = slot_desc->type();
 
-    if (e->function_name() == "lt" || e->function_name() == "le") {
+    const string& fn_name = conjunct->root()->function_name();
+    if (fn_name == "lt" || fn_name == "le") {
       // We need to get min stats.
       stats_read = ColumnStatsBase::ReadFromThrift(stats, col_type,
           ColumnStatsBase::StatsField::MIN, slot);
-    } else if (e->function_name() == "gt" || e->function_name() == "ge") {
+    } else if (fn_name == "gt" || fn_name == "ge") {
       // We need to get max stats.
       stats_read = ColumnStatsBase::ReadFromThrift(stats, col_type,
           ColumnStatsBase::StatsField::MAX, slot);
     } else {
-      DCHECK(false) << "Unsupported function name for statistics evaluation: "
-          << e->function_name();
+      DCHECK(false) << "Unsupported function name for statistics evaluation: " <<
fn_name;
     }
     if (stats_read) min_max_conjuncts_ctxs_to_eval_.push_back(conjunct);
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/996fb5ea/be/src/exprs/case-expr.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/case-expr.h b/be/src/exprs/case-expr.h
index 87bab76..a59f5c2 100644
--- a/be/src/exprs/case-expr.h
+++ b/be/src/exprs/case-expr.h
@@ -45,7 +45,6 @@ class CaseExpr: public Expr {
 
  protected:
   friend class Expr;
-  friend class ComputeFunctions;
   friend class ConditionalFunctions;
   friend class DecimalOperators;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/996fb5ea/be/src/exprs/expr.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/expr.h b/be/src/exprs/expr.h
index 8f14d6d..8919f59 100644
--- a/be/src/exprs/expr.h
+++ b/be/src/exprs/expr.h
@@ -255,10 +255,7 @@ class Expr {
 
  protected:
   friend class AggFnEvaluator;
-  friend class CastExpr;
-  friend class ComputeFunctions;
   friend class DecimalFunctions;
-  friend class DecimalLliteral;
   friend class DecimalOperators;
   friend class MathFunctions;
   friend class StringFunctions;
@@ -267,7 +264,6 @@ class Expr {
   friend class UtilityFunctions;
   friend class CaseExpr;
   friend class InPredicate;
-  friend class FunctionCall;
   friend class ScalarFnCall;
 
   Expr(const ColumnType& type, bool is_constant, bool is_slotref);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/996fb5ea/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index c5c552b..b1cd86a 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -336,12 +336,15 @@ public class HdfsScanNode extends ScanNode {
       BinaryPredicate binaryPred = (BinaryPredicate) pred;
 
       // We only support slot refs on the left hand side of the predicate, a rewriting
-      // rule makes sure that all compatible exprs are rewritten into this form.
-      if (!(binaryPred.getChild(0) instanceof SlotRef)) continue;
-      SlotRef slot = (SlotRef) binaryPred.getChild(0);
+      // rule makes sure that all compatible exprs are rewritten into this form. Only
+      // implicit casts are supported.
+      SlotRef slot = binaryPred.getChild(0).unwrapSlotRef(true);
+      if (slot == null) continue;
 
       // This node is a table scan, so this must be a scanning slot.
       Preconditions.checkState(slot.getDesc().isScanSlot());
+      // If the column is null, then this can be a 'pos' scanning slot of a nested type.
+      if (slot.getDesc().getColumn() == null) continue;
 
       Expr constExpr = binaryPred.getChild(1);
       // Only constant exprs can be evaluated against parquet::Statistics. This includes

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/996fb5ea/testdata/workloads/functional-query/queries/QueryTest/parquet_stats.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/parquet_stats.test b/testdata/workloads/functional-query/queries/QueryTest/parquet_stats.test
index 327fba6..6f9393d 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/parquet_stats.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/parquet_stats.test
@@ -229,3 +229,53 @@ select id, bool_col from functional_parquet.alltypessmall where 5 + 5
< int_col
 row_regex: .*NumRowGroups: 4 .*
 row_regex: .*NumStatsFilteredRowGroups: 4 .*
 ====
+---- QUERY
+# Test name based column resolution
+create table name_resolve stored as parquet as select * from functional_parquet.alltypessmall;
+alter table name_resolve replace columns (int_col int, bool_col boolean, tinyint_col tinyint,
smallint_col smallint, id int);
+set parquet_fallback_schema_resolution=NAME;
+# If this picks up the stats from int_col, then it will filter all row groups and return
+# an incorrect result.
+select count(*) from name_resolve where id > 10;
+---- RESULTS
+89
+---- RUNTIME_PROFILE
+row_regex: .*NumRowGroups: 1 .*
+row_regex: .*NumStatsFilteredRowGroups: 0 .*
+====
+---- QUERY
+# Query that has an implicit cast to a larger integer type
+select count(*) from functional_parquet.alltypessmall where tinyint_col > 1000000000000
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+row_regex: .*NumRowGroups: 4 .*
+row_regex: .*NumStatsFilteredRowGroups: 4 .*
+====
+---- QUERY
+# Predicates with explicit casts are not supported when evaluating parquet::Statistics.
+select count(*) from functional_parquet.alltypessmall where '0' > cast(tinyint_col as
string)
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+row_regex: .*NumRowGroups: 4 .*
+row_regex: .*NumStatsFilteredRowGroups: 0 .*
+====
+---- QUERY
+# Explicit casts between numerical types can violate the transitivity of "min()", so they
+# are not supported when evaluating parquet::Statistics.
+select count(*) from functional_parquet.alltypes where cast(id as tinyint) < 10;
+---- RESULTS
+3878
+---- RUNTIME_PROFILE
+row_regex: .*NumRowGroups: 24 .*
+row_regex: .*NumStatsFilteredRowGroups: 0 .*
+====
+---- QUERY
+select count(*) from functional_parquet.complextypestbl.int_array where pos < 5;
+---- RESULTS
+9
+---- RUNTIME_PROFILE
+row_regex: .*NumRowGroups: 2 .*
+row_regex: .*NumStatsFilteredRowGroups: 0 .*
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/996fb5ea/tests/query_test/test_parquet_stats.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_parquet_stats.py b/tests/query_test/test_parquet_stats.py
new file mode 100644
index 0000000..502301a
--- /dev/null
+++ b/tests/query_test/test_parquet_stats.py
@@ -0,0 +1,41 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+
+from tests.common.impala_test_suite import ImpalaTestSuite
+
+class TestParquetStats(ImpalaTestSuite):
+  """
+  This suite tests runtime optimizations based on Parquet statistics.
+  """
+
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestParquetStats, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_constraint(
+        lambda v: v.get_value('table_format').file_format == 'parquet')
+
+  def test_parquet_stats(self, vector, unique_database):
+    # The test makes assumptions about the number of row groups that are processed and
+    # skipped inside a fragment, so we ensure that the tests run in a single fragment.
+    vector.get_value('exec_option')['num_nodes'] = 1
+    self.run_test_case('QueryTest/parquet_stats', vector, use_db=unique_database)


Mime
View raw message