impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tarmstr...@apache.org
Subject [01/15] incubator-impala git commit: IMPALA-4794: Grouping distinct agg plan robust to data skew
Date Thu, 17 Aug 2017 03:14:17 GMT
Repository: incubator-impala
Updated Branches:
  refs/heads/master 158fd330b -> c7db60aa4


IMPALA-4794: Grouping distinct agg plan robust to data skew

This patch changes the query plan for grouping distinct aggregations to
be more robust to data skew in the grouping expressions. The existing
plan partitions data between phase-1 and phase-2 by the grouping exprs.
Under this strategy the data skewness on the grouping exprs directly
impacts performance. The new plan partitions data by both the grouping
exprs and distinct agg exprs, then adds one more aggregation and
exchange node. The new plan is more robust to data skew but does more
work than the old plan.

Testing: Modified existing planner tests which already provide
sufficient coverage. The pattern is that the distinct agg exprs are
added to the first exchange node, followed by an additional merge agg
and exchange node.

Change-Id: I7bdada0e328b555900c7b7ff8aabc8eb15ae8fa9
Reviewed-on: http://gerrit.cloudera.org:8080/7643
Reviewed-by: Alex Behm <alex.behm@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/b660bd65
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/b660bd65
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/b660bd65

Branch: refs/heads/master
Commit: b660bd652f69913ac558d5e6ab5c4bdee3d97601
Parents: 158fd33
Author: Tianyi Wang <twang@cloudera.com>
Authored: Wed Aug 16 11:05:07 2017 -0700
Committer: Impala Public Jenkins <impala-public-jenkins@gerrit.cloudera.org>
Committed: Wed Aug 16 23:20:22 2017 +0000

----------------------------------------------------------------------
 .../impala/planner/DistributedPlanner.java      | 135 +++++++++----------
 .../queries/PlannerTest/aggregation.test        |  98 +++++++++-----
 .../queries/PlannerTest/distinct.test           |  60 ++++++---
 .../queries/PlannerTest/insert.test             |  10 +-
 .../queries/PlannerTest/kudu.test               |  14 +-
 .../queries/PlannerTest/tpch-all.test           |  24 +++-
 .../queries/PlannerTest/tpch-nested.test        |  12 +-
 7 files changed, 219 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b660bd65/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
index 2266625..6571036 100644
--- a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
@@ -855,98 +855,87 @@ public class DistributedPlanner {
 
   /**
    * Returns a fragment that materialises the final result of a distinct aggregation
-   * where 'childFragment' is a partitioned fragment with the first phase aggregation
-   * as its root and 'node' is the second phase of the distinct aggregation.
+   * where 'childFragment' is a partitioned fragment with the phase-1 aggregation
+   * as its root.
    */
-  private PlanFragment createPhase2DistinctAggregationFragment(AggregationNode node,
-      PlanFragment childFragment, ArrayList<PlanFragment> fragments)
-      throws ImpalaException {
-    ArrayList<Expr> groupingExprs = node.getAggInfo().getGroupingExprs();
-    boolean hasGrouping = !groupingExprs.isEmpty();
-
-    // The first-phase aggregation node is already in the child fragment.
-    Preconditions.checkState(node.getChild(0) == childFragment.getPlanRoot());
-
-    AggregateInfo firstPhaseAggInfo = ((AggregationNode) node.getChild(0)).getAggInfo();
-    List<Expr> partitionExprs = null;
-    if (hasGrouping) {
-      // We need to do
-      // - child fragment:
-      //   * phase-1 aggregation
-      // - merge fragment, hash-partitioned on grouping exprs:
-      //   * merge agg of phase 1
-      //   * phase 2 agg
-      // The output partition exprs of the child are the (input) grouping exprs of the
-      // parent. The grouping exprs reference the output tuple of the 1st phase, but the
-      // partitioning happens on the intermediate tuple of the 1st phase.
-      partitionExprs = Expr.substituteList(
-          groupingExprs, firstPhaseAggInfo.getOutputToIntermediateSmap(),
-          ctx_.getRootAnalyzer(), false);
-    } else {
-      // We need to do
-      // - child fragment:
-      //   * phase-1 aggregation
-      // - merge fragment 1, hash-partitioned on distinct exprs:
-      //   * merge agg of phase 1
-      //   * phase 2 agg
-      // - merge fragment 2, unpartitioned:
-      //   * merge agg of phase 2
-      partitionExprs = Expr.substituteList(firstPhaseAggInfo.getGroupingExprs(),
-          firstPhaseAggInfo.getIntermediateSmap(), ctx_.getRootAnalyzer(), false);
-    }
-
-    PlanFragment mergeFragment = null;
+  private PlanFragment createPhase2DistinctAggregationFragment(
+      AggregationNode phase2AggNode, PlanFragment childFragment,
+      ArrayList<PlanFragment> fragments) throws ImpalaException {
+    // The phase-1 aggregation node is already in the child fragment.
+    Preconditions.checkState(phase2AggNode.getChild(0) == childFragment.getPlanRoot());
+
+    AggregateInfo phase1AggInfo = ((AggregationNode) phase2AggNode.getChild(0))
+        .getAggInfo();
+    // We need to do
+    // - child fragment:
+    //   * phase-1 aggregation
+    // - first merge fragment, hash-partitioned on grouping and distinct exprs:
+    //   * merge agg of phase-1
+    //   * phase-2 agg
+    // - second merge fragment, partitioned on grouping exprs or unpartitioned
+    //   without grouping exprs
+    //   * merge agg of phase-2
+    // With grouping, the output partition exprs of the child are the (input) grouping
+    // exprs of the parent. The grouping exprs reference the output tuple of phase-1
+    // but the partitioning happens on the intermediate tuple of the phase-1.
+    ArrayList<Expr> partitionExprs = Expr.substituteList(
+        phase1AggInfo.getGroupingExprs(), phase1AggInfo.getIntermediateSmap(),
+        ctx_.getRootAnalyzer(), false);
+
+    PlanFragment firstMergeFragment;
     boolean childHasCompatPartition = ctx_.getRootAnalyzer().equivSets(partitionExprs,
         childFragment.getDataPartition().getPartitionExprs());
     if (childHasCompatPartition) {
       // The data is already partitioned on the required expressions, we can skip the
-      // phase 1 merge step.
-      childFragment.addPlanRoot(node);
-      mergeFragment = childFragment;
+      // phase-1 merge step.
+      childFragment.addPlanRoot(phase2AggNode);
+      firstMergeFragment = childFragment;
     } else {
       DataPartition mergePartition = DataPartition.hashPartitioned(partitionExprs);
       // Convert the existing node to a preaggregation.
-      AggregationNode preaggNode = (AggregationNode)node.getChild(0);
+      AggregationNode preaggNode = (AggregationNode)phase2AggNode.getChild(0);
       preaggNode.setIsPreagg(ctx_);
 
-      // place a merge aggregation step for the 1st phase in a new fragment
-      mergeFragment = createParentFragment(childFragment, mergePartition);
-      AggregateInfo phase1MergeAggInfo = firstPhaseAggInfo.getMergeAggInfo();
+      // place phase-1 merge aggregation step in a new fragment
+      firstMergeFragment = createParentFragment(childFragment, mergePartition);
+      AggregateInfo phase1MergeAggInfo = phase1AggInfo.getMergeAggInfo();
       AggregationNode phase1MergeAggNode =
           new AggregationNode(ctx_.getNextNodeId(), preaggNode, phase1MergeAggInfo);
       phase1MergeAggNode.init(ctx_.getRootAnalyzer());
       phase1MergeAggNode.unsetNeedsFinalize();
       phase1MergeAggNode.setIntermediateTuple();
-      mergeFragment.addPlanRoot(phase1MergeAggNode);
+      firstMergeFragment.addPlanRoot(phase1MergeAggNode);
 
-      // the 2nd-phase aggregation consumes the output of the merge agg;
-      // if there is a limit, it had already been placed with the 2nd aggregation
+      // the phase-2 aggregation consumes the output of the phase-1 merge agg;
+      // if there is a limit, it had already been placed with the phase-2 aggregation
       // step (which is where it should be)
-      mergeFragment.addPlanRoot(node);
+      firstMergeFragment.addPlanRoot(phase2AggNode);
+      fragments.add(firstMergeFragment);
     }
-
-    if (!hasGrouping) {
-      // place the merge aggregation of the 2nd phase in an unpartitioned fragment;
-      // add preceding merge fragment at end
-      if (mergeFragment != childFragment) fragments.add(mergeFragment);
-
-      node.unsetNeedsFinalize();
-      node.setIntermediateTuple();
-      // Any limit should be placed in the final merge aggregation node
-      long limit = node.getLimit();
-      node.unsetLimit();
-      mergeFragment = createParentFragment(mergeFragment, DataPartition.UNPARTITIONED);
-      AggregateInfo phase2MergeAggInfo = node.getAggInfo().getMergeAggInfo();
-      AggregationNode phase2MergeAggNode = new AggregationNode(ctx_.getNextNodeId(), node,
-          phase2MergeAggInfo);
-      phase2MergeAggNode.init(ctx_.getRootAnalyzer());
-      // Transfer having predicates. If hasGrouping == true, the predicates should
-      // instead be evaluated by the 2nd phase agg (the predicates are already there).
-      node.transferConjuncts(phase2MergeAggNode);
-      phase2MergeAggNode.setLimit(limit);
-      mergeFragment.addPlanRoot(phase2MergeAggNode);
+    phase2AggNode.unsetNeedsFinalize();
+    phase2AggNode.setIntermediateTuple();
+    // Limit should be applied at the final merge aggregation node
+    long limit = phase2AggNode.getLimit();
+    phase2AggNode.unsetLimit();
+
+    DataPartition mergePartition;
+    if (phase2AggNode.getAggInfo().getGroupingExprs().isEmpty()) {
+      mergePartition = DataPartition.UNPARTITIONED;
+    } else {
+      phase2AggNode.setIsPreagg(ctx_);
+      mergePartition = DataPartition.hashPartitioned(
+          phase2AggNode.getAggInfo().getMergeAggInfo().getGroupingExprs());
     }
-    return mergeFragment;
+    PlanFragment secondMergeFragment =
+        createParentFragment(firstMergeFragment, mergePartition);
+    AggregationNode phase2MergeAggNode = new AggregationNode(ctx_.getNextNodeId(),
+        phase2AggNode, phase2AggNode.getAggInfo().getMergeAggInfo());
+    phase2MergeAggNode.init(ctx_.getRootAnalyzer());
+    phase2MergeAggNode.setLimit(limit);
+    // Transfer having predicates to final merge agg node
+    phase2AggNode.transferConjuncts(phase2MergeAggNode);
+    secondMergeFragment.addPlanRoot(phase2MergeAggNode);
+    return secondMergeFragment;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b660bd65/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test b/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test
index b5c3970..15db74d 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test
@@ -345,18 +345,24 @@ PLAN-ROOT SINK
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
-07:EXCHANGE [UNPARTITIONED]
+09:EXCHANGE [UNPARTITIONED]
 |  limit: 10
 |
-04:AGGREGATE [FINALIZE]
-|  output: count(int_col)
+08:AGGREGATE [FINALIZE]
+|  output: count:merge(int_col)
 |  group by: t.bigint_col
 |  limit: 10
 |
+07:EXCHANGE [HASH(t.bigint_col)]
+|
+04:AGGREGATE [STREAMING]
+|  output: count(int_col)
+|  group by: t.bigint_col
+|
 06:AGGREGATE
 |  group by: t.bigint_col, int_col
 |
-05:EXCHANGE [HASH(t.bigint_col)]
+05:EXCHANGE [HASH(t.bigint_col,int_col)]
 |
 03:AGGREGATE [STREAMING]
 |  group by: bigint_col, int_col
@@ -458,19 +464,25 @@ PLAN-ROOT SINK
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
-07:EXCHANGE [UNPARTITIONED]
+09:EXCHANGE [UNPARTITIONED]
 |  limit: 10
 |
-04:AGGREGATE [FINALIZE]
-|  output: count(int_col), count:merge(smallint_col)
+08:AGGREGATE [FINALIZE]
+|  output: count:merge(int_col), count:merge(smallint_col)
 |  group by: t.bigint_col
 |  limit: 10
 |
+07:EXCHANGE [HASH(t.bigint_col)]
+|
+04:AGGREGATE [STREAMING]
+|  output: count(int_col), count:merge(smallint_col)
+|  group by: t.bigint_col
+|
 06:AGGREGATE
 |  output: count:merge(smallint_col)
 |  group by: t.bigint_col, int_col
 |
-05:EXCHANGE [HASH(t.bigint_col)]
+05:EXCHANGE [HASH(t.bigint_col,int_col)]
 |
 03:AGGREGATE [STREAMING]
 |  output: count(smallint_col)
@@ -519,19 +531,25 @@ PLAN-ROOT SINK
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
-10:EXCHANGE [UNPARTITIONED]
+12:EXCHANGE [UNPARTITIONED]
 |  limit: 10
 |
-05:AGGREGATE [FINALIZE]
-|  output: count(int_col), count:merge(smallint_col)
+11:AGGREGATE [FINALIZE]
+|  output: count:merge(int_col), count:merge(smallint_col)
 |  group by: t.bigint_col
 |  limit: 10
 |
+10:EXCHANGE [HASH(t.bigint_col)]
+|
+05:AGGREGATE [STREAMING]
+|  output: count(int_col), count:merge(smallint_col)
+|  group by: t.bigint_col
+|
 09:AGGREGATE
 |  output: count:merge(smallint_col)
 |  group by: t.bigint_col, int_col
 |
-08:EXCHANGE [HASH(t.bigint_col)]
+08:EXCHANGE [HASH(t.bigint_col,int_col)]
 |
 04:AGGREGATE [STREAMING]
 |  output: count(smallint_col)
@@ -569,7 +587,7 @@ PLAN-ROOT SINK
 |  group by: l_partkey
 |
 00:SCAN HDFS [tpch_parquet.lineitem]
-   partitions=1/1 files=3 size=193.74MB
+   partitions=1/1 files=3 size=193.92MB
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -592,7 +610,7 @@ PLAN-ROOT SINK
 |  group by: l_partkey
 |
 00:SCAN HDFS [tpch_parquet.lineitem]
-   partitions=1/1 files=3 size=193.74MB
+   partitions=1/1 files=3 size=193.92MB
 ====
 # test that aggregations are not placed below an unpartitioned exchange with a limit
 select count(*) from (select * from functional.alltypes limit 10) t
@@ -883,16 +901,22 @@ PLAN-ROOT SINK
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
-05:EXCHANGE [UNPARTITIONED]
+07:EXCHANGE [UNPARTITIONED]
 |
-02:AGGREGATE [FINALIZE]
+06:AGGREGATE [FINALIZE]
+|  output: count:merge(CAST(timestamp_col AS STRING)), group_concat:merge(CAST(timestamp_col
AS STRING))
+|  group by: year
+|
+05:EXCHANGE [HASH(year)]
+|
+02:AGGREGATE [STREAMING]
 |  output: count(CAST(timestamp_col AS STRING)), group_concat(CAST(timestamp_col AS STRING))
 |  group by: year
 |
 04:AGGREGATE
 |  group by: year, CAST(timestamp_col AS STRING)
 |
-03:EXCHANGE [HASH(year)]
+03:EXCHANGE [HASH(year,CAST(timestamp_col AS STRING))]
 |
 01:AGGREGATE [STREAMING]
 |  group by: year, CAST(timestamp_col AS STRING)
@@ -998,9 +1022,15 @@ PLAN-ROOT SINK
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
-05:EXCHANGE [UNPARTITIONED]
+07:EXCHANGE [UNPARTITIONED]
 |
-02:AGGREGATE [FINALIZE]
+06:AGGREGATE [FINALIZE]
+|  output: count:merge(date_string_col), group_concat:merge(date_string_col, '-'), count:merge(*)
+|  group by: month, year
+|
+05:EXCHANGE [HASH(month,year)]
+|
+02:AGGREGATE [STREAMING]
 |  output: count(date_string_col), group_concat(date_string_col, '-'), count:merge(*)
 |  group by: month, year
 |
@@ -1008,7 +1038,7 @@ PLAN-ROOT SINK
 |  output: count:merge(*)
 |  group by: month, year, date_string_col
 |
-03:EXCHANGE [HASH(month,year)]
+03:EXCHANGE [HASH(month,year,date_string_col)]
 |
 01:AGGREGATE [STREAMING]
 |  output: count(*)
@@ -1093,12 +1123,12 @@ PLAN-ROOT SINK
 |--05:EXCHANGE [HASH(o_custkey)]
 |  |
 |  01:SCAN HDFS [tpch_parquet.orders]
-|     partitions=1/1 files=2 size=54.00MB
+|     partitions=1/1 files=2 size=54.20MB
 |
 04:EXCHANGE [HASH(c_custkey)]
 |
 00:SCAN HDFS [tpch_parquet.customer]
-   partitions=1/1 files=1 size=12.27MB
+   partitions=1/1 files=1 size=12.34MB
    predicates: c_nationkey = 16
    runtime filters: RF000 -> c_custkey
 ====
@@ -1133,12 +1163,12 @@ PLAN-ROOT SINK
 |--06:EXCHANGE [HASH(c_custkey)]
 |  |
 |  01:SCAN HDFS [tpch_parquet.customer]
-|     partitions=1/1 files=1 size=12.27MB
+|     partitions=1/1 files=1 size=12.34MB
 |
 05:EXCHANGE [HASH(o_custkey)]
 |
 00:SCAN HDFS [tpch_parquet.orders]
-   partitions=1/1 files=2 size=54.00MB
+   partitions=1/1 files=2 size=54.20MB
    runtime filters: RF000 -> o_custkey
 ====
 # Distinct grouping aggregation where input is partitioned on distinct and grouping exprs.
@@ -1150,9 +1180,15 @@ group by 1
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
-07:EXCHANGE [UNPARTITIONED]
+09:EXCHANGE [UNPARTITIONED]
 |
-04:AGGREGATE [FINALIZE]
+08:AGGREGATE [FINALIZE]
+|  output: count:merge(c_custkey)
+|  group by: c_custkey
+|
+07:EXCHANGE [HASH(c_custkey)]
+|
+04:AGGREGATE [STREAMING]
 |  output: count(c_custkey)
 |  group by: c_custkey
 |
@@ -1166,12 +1202,12 @@ PLAN-ROOT SINK
 |--06:EXCHANGE [HASH(c_custkey)]
 |  |
 |  01:SCAN HDFS [tpch_parquet.customer]
-|     partitions=1/1 files=1 size=12.27MB
+|     partitions=1/1 files=1 size=12.34MB
 |
 05:EXCHANGE [HASH(o_custkey)]
 |
 00:SCAN HDFS [tpch_parquet.orders]
-   partitions=1/1 files=2 size=54.00MB
+   partitions=1/1 files=2 size=54.20MB
    runtime filters: RF000 -> o_custkey
 ====
 # Complex aggregation when two joins and an agg end up in same fragment.
@@ -1205,7 +1241,7 @@ PLAN-ROOT SINK
 |--08:EXCHANGE [BROADCAST]
 |  |
 |  02:SCAN HDFS [tpch_parquet.customer]
-|     partitions=1/1 files=1 size=12.27MB
+|     partitions=1/1 files=1 size=12.34MB
 |
 03:HASH JOIN [INNER JOIN, PARTITIONED]
 |  hash predicates: l_orderkey = o_orderkey, l_returnflag = o_clerk
@@ -1214,13 +1250,13 @@ PLAN-ROOT SINK
 |--07:EXCHANGE [HASH(o_orderkey,o_clerk)]
 |  |
 |  01:SCAN HDFS [tpch_parquet.orders]
-|     partitions=1/1 files=2 size=54.00MB
+|     partitions=1/1 files=2 size=54.20MB
 |     runtime filters: RF000 -> o_custkey, RF001 -> o_comment
 |
 06:EXCHANGE [HASH(l_orderkey,l_returnflag)]
 |
 00:SCAN HDFS [tpch_parquet.lineitem]
-   partitions=1/1 files=3 size=193.61MB
+   partitions=1/1 files=3 size=193.92MB
    runtime filters: RF002 -> l_orderkey, RF003 -> l_returnflag
 ====
 # IMPALA-4263: Grouping agg needs a merge step because the grouping exprs reference a

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b660bd65/testdata/workloads/functional-planner/queries/PlannerTest/distinct.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/distinct.test b/testdata/workloads/functional-planner/queries/PlannerTest/distinct.test
index 0a41fb9..1e6b3ba 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/distinct.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/distinct.test
@@ -107,16 +107,22 @@ PLAN-ROOT SINK
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
-05:EXCHANGE [UNPARTITIONED]
+07:EXCHANGE [UNPARTITIONED]
 |
-02:AGGREGATE [FINALIZE]
+06:AGGREGATE [FINALIZE]
+|  output: count:merge(if(int_col IS NULL, NULL, bigint_col))
+|  group by: tinyint_col
+|
+05:EXCHANGE [HASH(tinyint_col)]
+|
+02:AGGREGATE [STREAMING]
 |  output: count(if(int_col IS NULL, NULL, bigint_col))
 |  group by: tinyint_col
 |
 04:AGGREGATE
 |  group by: tinyint_col, int_col, bigint_col
 |
-03:EXCHANGE [HASH(tinyint_col)]
+03:EXCHANGE [HASH(tinyint_col,int_col,bigint_col)]
 |
 01:AGGREGATE [STREAMING]
 |  group by: tinyint_col, int_col, bigint_col
@@ -143,16 +149,22 @@ PLAN-ROOT SINK
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
-05:EXCHANGE [UNPARTITIONED]
+07:EXCHANGE [UNPARTITIONED]
 |
-02:AGGREGATE [FINALIZE]
+06:AGGREGATE [FINALIZE]
+|  output: count:merge(int_col), sum:merge(int_col)
+|  group by: tinyint_col
+|
+05:EXCHANGE [HASH(tinyint_col)]
+|
+02:AGGREGATE [STREAMING]
 |  output: count(int_col), sum(int_col)
 |  group by: tinyint_col
 |
 04:AGGREGATE
 |  group by: tinyint_col, int_col
 |
-03:EXCHANGE [HASH(tinyint_col)]
+03:EXCHANGE [HASH(tinyint_col,int_col)]
 |
 01:AGGREGATE [STREAMING]
 |  group by: tinyint_col, int_col
@@ -217,9 +229,15 @@ PLAN-ROOT SINK
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
-05:EXCHANGE [UNPARTITIONED]
+07:EXCHANGE [UNPARTITIONED]
 |
-02:AGGREGATE [FINALIZE]
+06:AGGREGATE [FINALIZE]
+|  output: count:merge(int_col), min:merge(smallint_col), max:merge(string_col)
+|  group by: tinyint_col
+|
+05:EXCHANGE [HASH(tinyint_col)]
+|
+02:AGGREGATE [STREAMING]
 |  output: count(int_col), min:merge(smallint_col), max:merge(string_col)
 |  group by: tinyint_col
 |
@@ -227,7 +245,7 @@ PLAN-ROOT SINK
 |  output: min:merge(smallint_col), max:merge(string_col)
 |  group by: tinyint_col, int_col
 |
-03:EXCHANGE [HASH(tinyint_col)]
+03:EXCHANGE [HASH(tinyint_col,int_col)]
 |
 01:AGGREGATE [STREAMING]
 |  output: min(smallint_col), max(string_col)
@@ -256,9 +274,15 @@ PLAN-ROOT SINK
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
-05:EXCHANGE [UNPARTITIONED]
+07:EXCHANGE [UNPARTITIONED]
 |
-02:AGGREGATE [FINALIZE]
+06:AGGREGATE [FINALIZE]
+|  output: count:merge(int_col), sum:merge(int_col), count:merge(*), sum:merge(int_col),
min:merge(smallint_col), max:merge(bigint_col)
+|  group by: tinyint_col
+|
+05:EXCHANGE [HASH(tinyint_col)]
+|
+02:AGGREGATE [STREAMING]
 |  output: count(int_col), sum(int_col), count:merge(*), sum:merge(int_col), min:merge(smallint_col),
max:merge(bigint_col)
 |  group by: tinyint_col
 |
@@ -266,7 +290,7 @@ PLAN-ROOT SINK
 |  output: count:merge(*), sum:merge(int_col), min:merge(smallint_col), max:merge(bigint_col)
 |  group by: tinyint_col, int_col
 |
-03:EXCHANGE [HASH(tinyint_col)]
+03:EXCHANGE [HASH(tinyint_col,int_col)]
 |
 01:AGGREGATE [STREAMING]
 |  output: count(*), sum(int_col), min(smallint_col), max(bigint_col)
@@ -508,16 +532,22 @@ select * from (select count(distinct int_col) cd from functional.alltypes
group
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
-05:EXCHANGE [UNPARTITIONED]
+07:EXCHANGE [UNPARTITIONED]
 |
-02:AGGREGATE [FINALIZE]
+06:AGGREGATE [FINALIZE]
+|  output: count:merge(int_col)
+|  group by: bool_col
+|
+05:EXCHANGE [HASH(bool_col)]
+|
+02:AGGREGATE [STREAMING]
 |  output: count(int_col)
 |  group by: bool_col
 |
 04:AGGREGATE
 |  group by: bool_col, int_col
 |
-03:EXCHANGE [HASH(bool_col)]
+03:EXCHANGE [HASH(bool_col,int_col)]
 |
 01:AGGREGATE [STREAMING]
 |  group by: bool_col, int_col

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b660bd65/testdata/workloads/functional-planner/queries/PlannerTest/insert.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/insert.test b/testdata/workloads/functional-planner/queries/PlannerTest/insert.test
index bdbc92f..afecff9 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/insert.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/insert.test
@@ -402,14 +402,20 @@ WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(2010,10)]
 WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(2010,10)]
 |  partitions=1
 |
-02:AGGREGATE [FINALIZE]
+06:AGGREGATE [FINALIZE]
+|  output: count:merge(int_col)
+|  group by: string_col
+|
+05:EXCHANGE [HASH(string_col)]
+|
+02:AGGREGATE [STREAMING]
 |  output: count(int_col)
 |  group by: string_col
 |
 04:AGGREGATE
 |  group by: string_col, int_col
 |
-03:EXCHANGE [HASH(string_col)]
+03:EXCHANGE [HASH(string_col,int_col)]
 |
 01:AGGREGATE [STREAMING]
 |  group by: string_col, int_col

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b660bd65/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
index 436aa51..079d291 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
@@ -90,19 +90,25 @@ INSERT INTO KUDU [functional_kudu.testtbl]
 ---- DISTRIBUTEDPLAN
 INSERT INTO KUDU [functional_kudu.testtbl]
 |
-06:PARTIAL SORT
+08:PARTIAL SORT
 |  order by: KuduPartition(count(id)) ASC NULLS LAST, count(id) ASC NULLS LAST
 |
-05:EXCHANGE [KUDU(KuduPartition(count(id)))]
+07:EXCHANGE [KUDU(KuduPartition(count(id)))]
 |
-02:AGGREGATE [FINALIZE]
+06:AGGREGATE [FINALIZE]
+|  output: count:merge(id)
+|  group by: name
+|
+05:EXCHANGE [HASH(name)]
+|
+02:AGGREGATE [STREAMING]
 |  output: count(id)
 |  group by: name
 |
 04:AGGREGATE
 |  group by: name, id
 |
-03:EXCHANGE [HASH(name)]
+03:EXCHANGE [HASH(name,id)]
 |
 01:AGGREGATE [STREAMING]
 |  group by: name, id

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b660bd65/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test
index 4713ff3..2c6db60 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test
@@ -2917,20 +2917,26 @@ PLAN-ROOT SINK
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
-12:MERGING-EXCHANGE [UNPARTITIONED]
+14:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: count(ps_suppkey) DESC, p_brand ASC, p_type ASC, p_size ASC
 |
 07:SORT
 |  order by: count(ps_suppkey) DESC, p_brand ASC, p_type ASC, p_size ASC
 |
-06:AGGREGATE [FINALIZE]
+13:AGGREGATE [FINALIZE]
+|  output: count:merge(ps_suppkey)
+|  group by: p_brand, p_type, p_size
+|
+12:EXCHANGE [HASH(p_brand,p_type,p_size)]
+|
+06:AGGREGATE [STREAMING]
 |  output: count(ps_suppkey)
 |  group by: p_brand, p_type, p_size
 |
 11:AGGREGATE
 |  group by: p_brand, p_type, p_size, ps_suppkey
 |
-10:EXCHANGE [HASH(p_brand,p_type,p_size)]
+10:EXCHANGE [HASH(p_brand,p_type,p_size,ps_suppkey)]
 |
 05:AGGREGATE [STREAMING]
 |  group by: p_brand, p_type, p_size, ps_suppkey
@@ -2960,20 +2966,26 @@ PLAN-ROOT SINK
 ---- PARALLELPLANS
 PLAN-ROOT SINK
 |
-12:MERGING-EXCHANGE [UNPARTITIONED]
+14:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: count(ps_suppkey) DESC, p_brand ASC, p_type ASC, p_size ASC
 |
 07:SORT
 |  order by: count(ps_suppkey) DESC, p_brand ASC, p_type ASC, p_size ASC
 |
-06:AGGREGATE [FINALIZE]
+13:AGGREGATE [FINALIZE]
+|  output: count:merge(ps_suppkey)
+|  group by: p_brand, p_type, p_size
+|
+12:EXCHANGE [HASH(p_brand,p_type,p_size)]
+|
+06:AGGREGATE [STREAMING]
 |  output: count(ps_suppkey)
 |  group by: p_brand, p_type, p_size
 |
 11:AGGREGATE
 |  group by: p_brand, p_type, p_size, ps_suppkey
 |
-10:EXCHANGE [HASH(p_brand,p_type,p_size)]
+10:EXCHANGE [HASH(p_brand,p_type,p_size,ps_suppkey)]
 |
 05:AGGREGATE [STREAMING]
 |  group by: p_brand, p_type, p_size, ps_suppkey

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b660bd65/testdata/workloads/functional-planner/queries/PlannerTest/tpch-nested.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpch-nested.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpch-nested.test
index dd1818a..aad75e0 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/tpch-nested.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpch-nested.test
@@ -1854,20 +1854,26 @@ PLAN-ROOT SINK
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
-13:MERGING-EXCHANGE [UNPARTITIONED]
+15:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: count(s_suppkey) DESC, p_brand ASC, p_type ASC, p_size ASC
 |
 09:SORT
 |  order by: count(s_suppkey) DESC, p_brand ASC, p_type ASC, p_size ASC
 |
-08:AGGREGATE [FINALIZE]
+14:AGGREGATE [FINALIZE]
+|  output: count:merge(s_suppkey)
+|  group by: p_brand, p_type, p_size
+|
+13:EXCHANGE [HASH(p_brand,p_type,p_size)]
+|
+08:AGGREGATE [STREAMING]
 |  output: count(s_suppkey)
 |  group by: p_brand, p_type, p_size
 |
 12:AGGREGATE
 |  group by: p_brand, p_type, p_size, s_suppkey
 |
-11:EXCHANGE [HASH(p_brand,p_type,p_size)]
+11:EXCHANGE [HASH(p_brand,p_type,p_size,s_suppkey)]
 |
 07:AGGREGATE [STREAMING]
 |  group by: p_brand, p_type, p_size, s_suppkey


Mime
View raw message