impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sail...@apache.org
Subject [2/7] incubator-impala git commit: IMPALA-4572: Run COMPUTE STATS on Parquet tables with MT_DOP=4.
Date Mon, 05 Dec 2016 22:25:00 GMT
IMPALA-4572: Run COMPUTE STATS on Parquet tables with MT_DOP=4.

COMPUTE STATS on Parquet tables is run with MT_DOP=4 by default.
COMPUTE STATS on non-Parquet tables will run without MT_DOP.

Users can always override the behavior by setting MT_DOP manually.
Setting MT_DOP to 0 means a statement will be run in the
conventional execution mode (without intra-node paralellism based
on multiple fragment instances). Users can set a higher MT_DOP
even for Parquet tables.

Testing: Added a new test that checks the effective MT_DOP.
Locally ran test_mt_dop.py, test_scanners.py, test_nested_types.py,
test_compute_stats.py, and test_cancellation.py.

Change-Id: I2be3c7c9f3004e9a759224a2e5756eb6e4efa359
Reviewed-on: http://gerrit.cloudera.org:8080/5315
Reviewed-by: Alex Behm <alex.behm@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/7efa0831
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/7efa0831
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/7efa0831

Branch: refs/heads/master
Commit: 7efa08316ecb8f73d1c968ed602d11d40c714a1f
Parents: 6662c55
Author: Alex Behm <alex.behm@cloudera.com>
Authored: Thu Dec 1 13:58:19 2016 -0800
Committer: Internal Jenkins <cloudera-hudson@gerrit.cloudera.org>
Committed: Sat Dec 3 22:28:53 2016 +0000

----------------------------------------------------------------------
 common/thrift/ImpalaInternalService.thrift      |  3 +-
 .../impala/analysis/ComputeStatsStmt.java       | 20 ++++++++
 .../org/apache/impala/planner/HdfsScanNode.java |  3 +-
 .../impala/planner/SingleNodePlanner.java       | 10 ++--
 .../org/apache/impala/service/Frontend.java     | 18 ++++++-
 .../org/apache/impala/planner/PlannerTest.java  | 52 ++++++++++++++++++++
 .../apache/impala/planner/PlannerTestBase.java  |  4 +-
 .../org/apache/impala/service/FrontendTest.java |  4 +-
 tests/query_test/test_mt_dop.py                 |  4 +-
 9 files changed, 107 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7efa0831/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 4f81e27..f18947a 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -191,7 +191,8 @@ struct TQueryOptions {
   // query per backend.
   // > 0: multi-threaded execution mode, with given dop
   // 0: single-threaded execution mode
-  44: optional i32 mt_dop = 0
+  // unset: may be set automatically to > 0 in createExecRequest(), otherwise same as
0
+  44: optional i32 mt_dop
 
   // If true, INSERT writes to S3 go directly to their final location rather than being
   // copied there by the coordinator. We cannot do this for INSERT OVERWRITES because for

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7efa0831/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
index 84b866b..90c46a8 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
@@ -17,6 +17,7 @@
 
 package org.apache.impala.analysis;
 
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 
@@ -24,6 +25,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.impala.authorization.Privilege;
 import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.HBaseTable;
+import org.apache.impala.catalog.HdfsFileFormat;
 import org.apache.impala.catalog.HdfsPartition;
 import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.catalog.Table;
@@ -528,6 +530,24 @@ public class ComputeStatsStmt extends StatementBase {
   public String getTblStatsQuery() { return tableStatsQueryStr_; }
   public String getColStatsQuery() { return columnStatsQueryStr_; }
 
+  /**
+   * Returns true if this statement computes stats on Parquet partitions only,
+   * false otherwise.
+   */
+  public boolean isParquetOnly() {
+    if (!(table_ instanceof HdfsTable)) return false;
+    Collection<HdfsPartition> affectedPartitions = null;
+    if (partitionSet_ != null) {
+      affectedPartitions = partitionSet_.getPartitions();
+    } else {
+      affectedPartitions = ((HdfsTable) table_).getPartitions();
+    }
+    for (HdfsPartition partition: affectedPartitions) {
+      if (partition.getFileFormat() != HdfsFileFormat.PARQUET) return false;
+    }
+    return true;
+  }
+
   @Override
   public String toSql() {
     if (!isIncremental_) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7efa0831/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 0aee399..9642b97 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -173,7 +173,8 @@ public class HdfsScanNode extends ScanNode {
 
     // Determine backend scan node implementation to use. The optimized MT implementation
     // is currently only supported for Parquet.
-    if (analyzer.getQueryOptions().mt_dop > 0 &&
+    if (analyzer.getQueryOptions().isSetMt_dop() &&
+        analyzer.getQueryOptions().mt_dop > 0 &&
         fileFormats.size() == 1 && fileFormats.contains(HdfsFileFormat.PARQUET))
{
       useMtScanNode_ = true;
     } else {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7efa0831/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
index 1634dd2..4bc8a88 100644
--- a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
@@ -158,14 +158,18 @@ public class SingleNodePlanner {
    * Throws a NotImplementedException if plan validation fails.
    */
   public void validatePlan(PlanNode planNode) throws NotImplementedException {
-    if (ctx_.getQueryOptions().mt_dop > 0 && !RuntimeEnv.INSTANCE.isTestEnv()
+    if (ctx_.getQueryOptions().isSetMt_dop() && ctx_.getQueryOptions().mt_dop >
0
+        && !RuntimeEnv.INSTANCE.isTestEnv()
         && (planNode instanceof JoinNode || ctx_.hasTableSink())) {
       throw new NotImplementedException(
           "MT_DOP not supported for plans with base table joins or table sinks.");
     }
 
-    // As long as MT_DOP == 0 any join can run in a single-node plan.
-    if (ctx_.isSingleNodeExec() && ctx_.getQueryOptions().mt_dop == 0) return;
+    // As long as MT_DOP is unset or 0 any join can run in a single-node plan.
+    if (ctx_.isSingleNodeExec() &&
+        (!ctx_.getQueryOptions().isSetMt_dop() || ctx_.getQueryOptions().mt_dop == 0)) {
+      return;
+    }
 
     if (planNode instanceof NestedLoopJoinNode) {
       JoinNode joinNode = (JoinNode) planNode;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7efa0831/fe/src/main/java/org/apache/impala/service/Frontend.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index e7eabb1..c98ba49 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -114,6 +114,7 @@ import org.apache.impala.thrift.TPlanExecInfo;
 import org.apache.impala.thrift.TPlanFragment;
 import org.apache.impala.thrift.TQueryCtx;
 import org.apache.impala.thrift.TQueryExecRequest;
+import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TResetMetadataRequest;
 import org.apache.impala.thrift.TResultRow;
 import org.apache.impala.thrift.TResultSet;
@@ -978,8 +979,9 @@ public class Frontend {
       Planner planner, StringBuilder explainString) throws ImpalaException {
     TQueryCtx queryCtx = planner.getQueryCtx();
     AnalysisContext.AnalysisResult analysisResult = planner.getAnalysisResult();
-    boolean isMtExec =
-        analysisResult.isQueryStmt() && queryCtx.request.query_options.mt_dop >
0;
+    boolean isMtExec = analysisResult.isQueryStmt() &&
+        queryCtx.request.query_options.isSetMt_dop() &&
+        queryCtx.request.query_options.mt_dop > 0;
 
     List<PlanFragment> planRoots = Lists.newArrayList();
     TQueryExecRequest result = new TQueryExecRequest();
@@ -1038,6 +1040,7 @@ public class Frontend {
     result.setAccess_events(analysisResult.getAccessEvents());
     result.analysis_warnings = analysisResult.getAnalyzer().getWarnings();
 
+    TQueryOptions queryOptions = queryCtx.request.query_options;
     if (analysisResult.isCatalogOp()) {
       result.stmt_type = TStmtType.DDL;
       createCatalogOpRequest(analysisResult, result);
@@ -1045,6 +1048,15 @@ public class Frontend {
       if (thriftLineageGraph != null && thriftLineageGraph.isSetQuery_text()) {
         result.catalog_op_request.setLineage_graph(thriftLineageGraph);
       }
+      // Set MT_DOP=4 for COMPUTE STATS on Parquet tables, unless the user has already
+      // provided another value for MT_DOP.
+      if (!queryOptions.isSetMt_dop() &&
+          analysisResult.isComputeStatsStmt() &&
+          analysisResult.getComputeStatsStmt().isParquetOnly()) {
+        queryOptions.setMt_dop(4);
+      }
+      // If unset, set MT_DOP to 0 to simplify the rest of the code.
+      if (!queryOptions.isSetMt_dop()) queryOptions.setMt_dop(0);
       // All DDL operations except for CTAS are done with analysis at this point.
       if (!analysisResult.isCreateTableAsSelectStmt()) return result;
     } else if (analysisResult.isLoadDataStmt()) {
@@ -1061,6 +1073,8 @@ public class Frontend {
       result.setSet_query_option_request(analysisResult.getSetStmt().toThrift());
       return result;
     }
+    // If unset, set MT_DOP to 0 to simplify the rest of the code.
+    if (!queryOptions.isSetMt_dop()) queryOptions.setMt_dop(0);
 
     // create TQueryExecRequest
     Preconditions.checkState(analysisResult.isQueryStmt() || analysisResult.isDmlStmt()

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7efa0831/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index dce32a6..8c48ee4 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -17,14 +17,22 @@
 
 package org.apache.impala.planner;
 
+import org.apache.impala.catalog.Catalog;
 import org.apache.impala.catalog.Db;
+import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.RuntimeEnv;
+import org.apache.impala.testutil.TestUtils;
+import org.apache.impala.thrift.TExecRequest;
 import org.apache.impala.thrift.TExplainLevel;
+import org.apache.impala.thrift.TQueryCtx;
 import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TRuntimeFilterMode;
+import org.junit.Assert;
 import org.junit.Assume;
 import org.junit.Test;
 
+import jline.internal.Preconditions;
+
 // All planner tests, except for S3 specific tests should go here.
 public class PlannerTest extends PlannerTestBase {
 
@@ -308,4 +316,48 @@ public class PlannerTest extends PlannerTestBase {
       RuntimeEnv.INSTANCE.setTestEnv(true);
     }
   }
+
+  @Test
+  public void testComputeStatsMtDop() {
+    for (int mtDop: new int[] {-1, 0, 1, 16}) {
+      int effectiveMtDop = (mtDop != -1) ? mtDop : 0;
+      // MT_DOP is not set automatically for stmt other than COMPUTE STATS.
+      testEffectiveMtDop(
+          "select * from functional_parquet.alltypes", mtDop, effectiveMtDop);
+      // MT_DOP is not set automatically for COMPUTE STATS on non-Parquet tables.
+      testEffectiveMtDop(
+          "compute stats functional.alltypes", mtDop, effectiveMtDop);
+    }
+    // MT_DOP is set automatically for COMPUTE STATS on Parquet tables,
+    // but can be overridden by a user-provided MT_DOP.
+    testEffectiveMtDop("compute stats functional_parquet.alltypes", -1, 4);
+    testEffectiveMtDop("compute stats functional_parquet.alltypes", 0, 0);
+    testEffectiveMtDop("compute stats functional_parquet.alltypes", 1, 1);
+    testEffectiveMtDop("compute stats functional_parquet.alltypes", 16, 16);
+  }
+
+  /**
+   * Creates an exec request for 'stmt' setting the MT_DOP query option to 'userMtDop',
+   * or leaving it unset if 'userMtDop' is -1. Asserts that the MT_DOP of the generated
+   * exec request is equal to 'expectedMtDop'.
+   */
+  private void testEffectiveMtDop(String stmt, int userMtDop, int expectedMtDop) {
+    TQueryCtx queryCtx = TestUtils.createQueryContext(
+        Catalog.DEFAULT_DB, System.getProperty("user.name"));
+    queryCtx.request.setStmt(stmt);
+    queryCtx.request.query_options = defaultQueryOptions();
+    if (userMtDop != -1) queryCtx.request.query_options.setMt_dop(userMtDop);
+    StringBuilder explainBuilder = new StringBuilder();
+    TExecRequest request = null;
+    try {
+      request = frontend_.createExecRequest(queryCtx, explainBuilder);
+    } catch (ImpalaException e) {
+      Assert.fail("Failed to create exec request for '" + stmt + "': " + e.getMessage());
+    }
+    Preconditions.checkNotNull(request);
+    int actualMtDop = -1;
+    if (request.query_options.isSetMt_dop()) actualMtDop = request.query_options.mt_dop;
+    // Check that the effective MT_DOP is as expected.
+    Assert.assertEquals(actualMtDop, expectedMtDop);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7efa0831/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
index 4fff233..5e6dbc7 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
@@ -364,7 +364,7 @@ public class PlannerTestBase extends FrontendTestBase {
   /**
    * Merge the options of b into a and return a
    */
-  private TQueryOptions mergeQueryOptions(TQueryOptions a, TQueryOptions b) {
+  protected TQueryOptions mergeQueryOptions(TQueryOptions a, TQueryOptions b) {
     for(TQueryOptions._Fields f : TQueryOptions._Fields.values()) {
       if (b.isSet(f)) {
         a.setFieldValue(f, b.getFieldValue(f));
@@ -484,7 +484,7 @@ public class PlannerTestBase extends FrontendTestBase {
           ImpalaInternalServiceConstants.NUM_NODES_ALL);
     }
     if (section == Section.PARALLELPLANS) {
-      queryCtx.request.query_options.mt_dop = 2;
+      queryCtx.request.query_options.setMt_dop(2);
     }
     ArrayList<String> expectedPlan = testCase.getSectionContents(section);
     boolean sectionExists = expectedPlan != null && !expectedPlan.isEmpty();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7efa0831/fe/src/test/java/org/apache/impala/service/FrontendTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/service/FrontendTest.java b/fe/src/test/java/org/apache/impala/service/FrontendTest.java
index dfbdb12..dd6a6c8 100644
--- a/fe/src/test/java/org/apache/impala/service/FrontendTest.java
+++ b/fe/src/test/java/org/apache/impala/service/FrontendTest.java
@@ -32,7 +32,6 @@ import org.apache.hive.service.cli.thrift.TGetInfoReq;
 import org.apache.hive.service.cli.thrift.TGetSchemasReq;
 import org.apache.hive.service.cli.thrift.TGetTablesReq;
 import org.junit.Test;
-
 import org.apache.impala.analysis.AuthorizationTest;
 import org.apache.impala.authorization.AuthorizationConfig;
 import org.apache.impala.catalog.Catalog;
@@ -41,11 +40,14 @@ import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.testutil.ImpaladTestCatalog;
 import org.apache.impala.testutil.TestUtils;
+import org.apache.impala.thrift.TExecRequest;
 import org.apache.impala.thrift.TMetadataOpRequest;
 import org.apache.impala.thrift.TMetadataOpcode;
 import org.apache.impala.thrift.TQueryCtx;
+import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TResultRow;
 import org.apache.impala.thrift.TResultSet;
+
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7efa0831/tests/query_test/test_mt_dop.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_mt_dop.py b/tests/query_test/test_mt_dop.py
index ff60b60..1d522fd 100644
--- a/tests/query_test/test_mt_dop.py
+++ b/tests/query_test/test_mt_dop.py
@@ -25,7 +25,9 @@ from tests.common.skip import SkipIfOldAggsJoins
 from tests.common.test_vector import TestDimension
 from tests.common.test_vector import TestVector
 
-MT_DOP_VALUES = [1, 2, 8]
+# COMPUTE STATS on Parquet tables automatically sets MT_DOP=4, so include
+# the value 0 to cover the non-MT path as well.
+MT_DOP_VALUES = [0, 1, 2, 8]
 
 class TestMtDop(ImpalaTestSuite):
   @classmethod


Mime
View raw message