hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From li...@apache.org
Subject hive git commit: HIVE-16948: Invalid explain when running dynamic partition pruning query in Hive On Spark (Liyun Zhang, reviewed by Sahil and Rui)
Date Fri, 18 Aug 2017 07:25:25 GMT
Repository: hive
Updated Branches:
  refs/heads/master c9e09400f -> 94125a307


HIVE-16948: Invalid explain when running dynamic partition pruning query in Hive On Spark
(Liyun Zhang, reviewed by Sahil and Rui)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/94125a30
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/94125a30
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/94125a30

Branch: refs/heads/master
Commit: 94125a307ac18c69d0d34e1bc86f1035608a3a94
Parents: c9e0940
Author: Liyun Zhang <liyun.zhang@intel.com>
Authored: Fri Aug 18 15:25:21 2017 +0800
Committer: Rui Li <lirui@apache.org>
Committed: Fri Aug 18 15:25:21 2017 +0800

----------------------------------------------------------------------
 .../hadoop/hive/ql/exec/OperatorUtils.java      | 17 ++++-
 .../hive/ql/exec/spark/SparkUtilities.java      | 20 ++++++
 .../spark/CombineEquivalentWorkResolver.java    | 74 +++++++++++++++++++-
 .../hive/ql/exec/spark/TestSparkTask.java       | 49 +++++++++++++
 .../spark/spark_dynamic_partition_pruning.q.out | 28 --------
 5 files changed, 158 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/94125a30/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
index be0795d..7308b5f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.hive.ql.exec.NodeUtils.Function;
+import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.mapred.OutputCollector;
@@ -367,7 +368,7 @@ public class OperatorUtils {
    * Remove the branch that contains the specified operator. Do nothing if there's no branching,
    * i.e. all the upstream operators have only one child.
    */
-  public static void removeBranch(Operator<?> op) {
+  public static void removeBranch(SparkPartitionPruningSinkOperator op) {
     Operator<?> child = op;
     Operator<?> curr = op;
 
@@ -409,4 +410,18 @@ public class OperatorUtils {
     }
     return op.toString();
   }
+
+  /**
+   * Return true if contain branch otherwise return false
+   */
+  public static boolean isInBranch(SparkPartitionPruningSinkOperator op) {
+    Operator<?> curr = op;
+    while (curr.getChildOperators().size() <= 1) {
+      if (curr.getParentOperators() == null || curr.getParentOperators().isEmpty()) {
+        return false;
+      }
+      curr = curr.getParentOperators().get(0);
+    }
+    return true;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/94125a30/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
index eb9883a..fac3cea 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
 import java.util.Collection;
 
 import com.google.common.base.Preconditions;
@@ -30,6 +31,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession;
 import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManager;
@@ -207,4 +209,22 @@ public class SparkUtilities {
       collectOp(result, child, clazz);
     }
   }
+
+  /**
+   * remove currTask from the children of its parentTask
+   * remove currTask from the parent of its childrenTask
+   * @param currTask
+   */
+  public static void removeEmptySparkTask(SparkTask currTask) {
+    //remove currTask from parentTasks
+    ArrayList<Task> parTasks = new ArrayList<Task>();
+    parTasks.addAll(currTask.getParentTasks());
+
+    Object[] parTaskArr = parTasks.toArray();
+    for (Object parTask : parTaskArr) {
+      ((Task) parTask).removeDependentTask(currTask);
+    }
+    //remove currTask from childTasks
+    currTask.removeFromChildrenTasks();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/94125a30/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java
index 95ad962..2641c1a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.optimizer.spark;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -30,6 +31,10 @@ import java.util.Stack;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities;
+import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.slf4j.Logger;
@@ -56,9 +61,11 @@ import org.apache.hadoop.hive.ql.plan.SparkWork;
  */
 public class CombineEquivalentWorkResolver implements PhysicalPlanResolver {
   protected static transient Logger LOG = LoggerFactory.getLogger(CombineEquivalentWorkResolver.class);
-
+  private List<String> removedMapWorkNames = new ArrayList<String>();
+  private PhysicalContext pctx;
   @Override
   public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException {
+    this.pctx = pctx;
     List<Node> topNodes = new ArrayList<Node>();
     topNodes.addAll(pctx.getRootTasks());
     TaskGraphWalker taskWalker = new TaskGraphWalker(new EquivalentWorkMatcher());
@@ -82,6 +89,17 @@ public class CombineEquivalentWorkResolver implements PhysicalPlanResolver
{
         SparkWork sparkWork = sparkTask.getWork();
         Set<BaseWork> roots = sparkWork.getRoots();
         compareWorksRecursively(roots, sparkWork);
+        // For dpp case, dpp sink will appear in Task1 and the target work of dpp sink will
appear in Task2.
+        // Task2 is the child task of Task1. Task2 will be traversed before task1 because
TaskGraphWalker will first
+        // put children task in the front of task queue.
+        // If a spark work which is equal to other is found and removed in Task2, the dpp
sink can be removed when Task1
+        // is traversed(More detailed see HIVE-16948)
+        if (removedMapWorkNames.size() > 0) {
+          removeDynamicPartitionPruningSink(removedMapWorkNames, sparkWork);
+          if (sparkWork.getAllWork().size() == 0) {
+            removeEmptySparkTask(sparkTask);
+          }
+        }
       }
       return null;
     }
@@ -170,6 +188,10 @@ public class CombineEquivalentWorkResolver implements PhysicalPlanResolver
{
         }
       }
       sparkWork.remove(previous);
+      // In order to fix HIVE-16948
+      if (previous instanceof MapWork) {
+        removedMapWorkNames.add(previous.getName());
+      }
     }
 
     /*
@@ -306,5 +328,55 @@ public class CombineEquivalentWorkResolver implements PhysicalPlanResolver
{
     private boolean compareCurrentOperator(Operator<?> firstOperator, Operator<?>
secondOperator) {
       return firstOperator.logicalEquals(secondOperator);
     }
+
+    /**
+     * traverse the children in sparkWork to find the dpp sink operator which target work
is included in
+     * removedMapWorkList
+     * If there is branch, remove prune sink operator branch in the BaseWork
+     * If there is no branch, remove the whole BaseWork
+     *
+     * @param removedMapWorkList: the name of the map work has been deleted because they
are equals to other works.
+     * @param sparkWork:          current spark work
+     */
+    private void removeDynamicPartitionPruningSink(List<String> removedMapWorkList,
SparkWork sparkWork) {
+      List<BaseWork> allWorks = sparkWork.getAllWork();
+      for (BaseWork baseWork : allWorks) {
+        Set<Operator<?>> rootOperators = baseWork.getAllRootOperators();
+        for (Operator root : rootOperators) {
+          List<Operator<?>> pruningList = new ArrayList<>();
+          SparkUtilities.collectOp(pruningList, root, SparkPartitionPruningSinkOperator.class);
+          for (Operator pruneSinkOp : pruningList) {
+            SparkPartitionPruningSinkOperator sparkPruneSinkOp = (SparkPartitionPruningSinkOperator)
pruneSinkOp;
+            if (removedMapWorkList.contains(sparkPruneSinkOp.getConf().getTargetWork()))
{
+              LOG.debug("ready to remove the sparkPruneSinkOp which target work is " +
+                  sparkPruneSinkOp.getConf().getTargetWork() + " because the MapWork is equals
to other map work and " +
+                  "has been deleted!");
+              // If there is branch, remove prune sink operator branch in the baseWork
+              // If there is no branch, remove the whole baseWork
+              if (OperatorUtils.isInBranch(sparkPruneSinkOp)) {
+                OperatorUtils.removeBranch(sparkPruneSinkOp);
+              } else {
+                sparkWork.remove(baseWork);
+              }
+            }
+          }
+        }
+      }
+    }
+
+    private void removeEmptySparkTask(SparkTask currTask) {
+      // If currTask is rootTasks, remove it and add its children to the rootTasks which
currTask is its only parent
+      // task
+      if (pctx.getRootTasks().contains(currTask)) {
+        pctx.removeFromRootTask(currTask);
+        List<Task<? extends Serializable>> newRoots = currTask.getChildTasks();
+        for (Task newRoot : newRoots) {
+          if (newRoot.getParentTasks().size() == 1) {
+            pctx.addToRootTask(newRoot);
+          }
+        }
+      }
+      SparkUtilities.removeEmptySparkTask(currTask);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/94125a30/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java
index 4c7ec76..44931f0 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java
@@ -22,9 +22,16 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.hadoop.hive.common.metrics.common.Metrics;
 import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.SparkWork;
+import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -43,4 +50,46 @@ public class TestSparkTask {
     verify(mockMetrics, never()).incrementCounter(MetricsConstant.HIVE_MR_TASKS);
   }
 
+  @Test
+  public void removeEmptySparkTask() {
+    SparkTask grandpa = new SparkTask();
+    SparkWork grandpaWork = new SparkWork("grandpa");
+    grandpaWork.add(new MapWork());
+    grandpa.setWork(grandpaWork);
+
+    SparkTask parent = new SparkTask();
+    SparkWork parentWork = new SparkWork("parent");
+    parentWork.add(new MapWork());
+    parent.setWork(parentWork);
+
+    SparkTask child1 = new SparkTask();
+    SparkWork childWork1 = new SparkWork("child1");
+    childWork1.add(new MapWork());
+    child1.setWork(childWork1);
+
+
+    grandpa.addDependentTask(parent);
+    parent.addDependentTask(child1);
+
+    Assert.assertEquals(grandpa.getChildTasks().size(), 1);
+    Assert.assertEquals(child1.getParentTasks().size(), 1);
+    if (isEmptySparkWork(parent.getWork())) {
+      SparkUtilities.removeEmptySparkTask(parent);
+    }
+
+    Assert.assertEquals(grandpa.getChildTasks().size(), 0);
+    Assert.assertEquals(child1.getParentTasks().size(), 0);
+  }
+
+  private boolean isEmptySparkWork(SparkWork sparkWork) {
+    List<BaseWork> allWorks = sparkWork.getAllWork();
+    boolean allWorksIsEmtpy = true;
+    for (BaseWork work : allWorks) {
+      if (work.getAllOperators().size() > 0) {
+        allWorksIsEmtpy = false;
+        break;
+      }
+    }
+    return allWorksIsEmtpy;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/94125a30/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning.q.out
b/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning.q.out
index 1909485..080a152 100644
--- a/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning.q.out
+++ b/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning.q.out
@@ -4231,20 +4231,6 @@ STAGE PLANS:
                           partition key expr: ds
                           Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column
stats: NONE
                           target work: Map 1
-                    Select Operator
-                      expressions: _col0 (type: string)
-                      outputColumnNames: _col0
-                      Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column
stats: NONE
-                      Group By Operator
-                        keys: _col0 (type: string)
-                        mode: hash
-                        outputColumnNames: _col0
-                        Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column
stats: NONE
-                        Spark Partition Pruning Sink Operator
-                          Target column: ds (string)
-                          partition key expr: ds
-                          Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column
stats: NONE
-                          target work: Map 4
         Reducer 13 
             Reduce Operator Tree:
               Group By Operator
@@ -4274,20 +4260,6 @@ STAGE PLANS:
                           partition key expr: ds
                           Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column
stats: NONE
                           target work: Map 1
-                    Select Operator
-                      expressions: _col0 (type: string)
-                      outputColumnNames: _col0
-                      Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column
stats: NONE
-                      Group By Operator
-                        keys: _col0 (type: string)
-                        mode: hash
-                        outputColumnNames: _col0
-                        Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column
stats: NONE
-                        Spark Partition Pruning Sink Operator
-                          Target column: ds (string)
-                          partition key expr: ds
-                          Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column
stats: NONE
-                          target work: Map 4
 
   Stage: Stage-1
     Spark


Mime
View raw message