hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From stak...@apache.org
Subject hive git commit: HIVE-18368: Improve Spark Debug RDD Graph (Sahil Takiar, reviewed by Rui Li)
Date Thu, 08 Feb 2018 20:46:31 GMT
Repository: hive
Updated Branches:
  refs/heads/master 43e713746 -> 1e74aca8d


HIVE-18368: Improve Spark Debug RDD Graph (Sahil Takiar, reviewed by Rui Li)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1e74aca8
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1e74aca8
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1e74aca8

Branch: refs/heads/master
Commit: 1e74aca8d09ea2ef636311d2168b4d34198f7194
Parents: 43e7137
Author: Sahil Takiar <takiar.sahil@gmail.com>
Authored: Thu Feb 8 12:45:58 2018 -0800
Committer: Sahil Takiar <stakiar@cloudera.com>
Committed: Thu Feb 8 12:45:58 2018 -0800

----------------------------------------------------------------------
 .../hadoop/hive/ql/exec/spark/CacheTran.java    |  14 +-
 .../ql/exec/spark/LocalHiveSparkClient.java     |   6 +
 .../hadoop/hive/ql/exec/spark/MapInput.java     |  13 +-
 .../hadoop/hive/ql/exec/spark/MapTran.java      |  17 +-
 .../hadoop/hive/ql/exec/spark/ReduceTran.java   |  17 +-
 .../hadoop/hive/ql/exec/spark/ShuffleTran.java  |  19 ++-
 .../hadoop/hive/ql/exec/spark/SparkPlan.java    | 164 ++-----------------
 .../hive/ql/exec/spark/SparkPlanGenerator.java  |  35 +++-
 .../hadoop/hive/ql/exec/spark/SparkTran.java    |   2 -
 .../hive/ql/exec/spark/SparkUtilities.java      |  36 +---
 .../hive/ql/io/CombineHiveInputFormat.java      |   2 +-
 11 files changed, 85 insertions(+), 240 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/1e74aca8/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/CacheTran.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/CacheTran.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/CacheTran.java
index c5fec7d..4b77ac9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/CacheTran.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/CacheTran.java
@@ -27,9 +27,11 @@ public abstract class CacheTran<KI extends WritableComparable, VI, KO
extends Wr
   // whether to cache current RDD.
   private boolean caching = false;
   private JavaPairRDD<KO, VO> cachedRDD;
+  protected final String name;
 
-  protected CacheTran(boolean cache) {
+  protected CacheTran(boolean cache, String name) {
     this.caching = cache;
+    this.name = name;
   }
 
   @Override
@@ -40,9 +42,10 @@ public abstract class CacheTran<KI extends WritableComparable, VI, KO
extends Wr
         cachedRDD = doTransform(input);
         cachedRDD.persist(StorageLevel.MEMORY_AND_DISK());
       }
-      return cachedRDD;
+      return cachedRDD.setName(this.name + " (" + cachedRDD.getNumPartitions() + ", cached)");
     } else {
-      return doTransform(input);
+      JavaPairRDD<KO, VO> rdd = doTransform(input);
+      return rdd.setName(this.name + " (" + rdd.getNumPartitions() + ")");
     }
   }
 
@@ -51,4 +54,9 @@ public abstract class CacheTran<KI extends WritableComparable, VI, KO
extends Wr
   }
 
   protected abstract JavaPairRDD<KO, VO> doTransform(JavaPairRDD<KI, VI> input);
+
+  @Override
+  public String getName() {
+    return name;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/1e74aca8/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
index cab97a0..f43b449 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
@@ -25,7 +25,9 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.hive.ql.exec.DagUtils;
 import org.apache.hive.spark.client.SparkClientUtilities;
+import org.apache.spark.util.CallSite;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.FileSystem;
@@ -160,8 +162,12 @@ public class LocalHiveSparkClient implements HiveSparkClient {
 
     // Execute generated plan.
     JavaPairRDD<HiveKey, BytesWritable> finalRDD = plan.generateGraph();
+
+    sc.setJobGroup("queryId = " + sparkWork.getQueryId(), DagUtils.getQueryName(jobConf));
+
     // We use Spark RDD async action to submit job as it's the only way to get jobId now.
     JavaFutureAction<Void> future = finalRDD.foreachAsync(HiveVoidFunction.getInstance());
+
     // As we always use foreach action to submit RDD graph, it would only trigger one job.
     int jobId = future.jobIds().get(0);
     LocalSparkJobStatus sparkJobStatus = new LocalSparkJobStatus(

http://git-wip-us.apache.org/repos/asf/hive/blob/1e74aca8/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java
index d240d18..b1a0d55 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java
@@ -36,17 +36,18 @@ public class MapInput implements SparkTran<WritableComparable, Writable,
   private JavaPairRDD<WritableComparable, Writable> hadoopRDD;
   private boolean toCache;
   private final SparkPlan sparkPlan;
-  private String name = "MapInput";
+  private final String name;
 
   public MapInput(SparkPlan sparkPlan, JavaPairRDD<WritableComparable, Writable> hadoopRDD)
{
-    this(sparkPlan, hadoopRDD, false);
+    this(sparkPlan, hadoopRDD, false, "MapInput");
   }
 
   public MapInput(SparkPlan sparkPlan,
-      JavaPairRDD<WritableComparable, Writable> hadoopRDD, boolean toCache) {
+      JavaPairRDD<WritableComparable, Writable> hadoopRDD, boolean toCache, String
name) {
     this.hadoopRDD = hadoopRDD;
     this.toCache = toCache;
     this.sparkPlan = sparkPlan;
+    this.name = name;
   }
 
   public void setToCache(boolean toCache) {
@@ -66,6 +67,7 @@ public class MapInput implements SparkTran<WritableComparable, Writable,
     } else {
       result = hadoopRDD;
     }
+    result.setName(this.name);
     return result;
   }
 
@@ -96,9 +98,4 @@ public class MapInput implements SparkTran<WritableComparable, Writable,
   public Boolean isCacheEnable() {
     return new Boolean(toCache);
   }
-
-  @Override
-  public void setName(String name) {
-    this.name = name;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/1e74aca8/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java
index 2cc6845..b102f51 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java
@@ -24,14 +24,13 @@ import org.apache.spark.api.java.JavaPairRDD;
 
 public class MapTran extends CacheTran<BytesWritable, BytesWritable, HiveKey, BytesWritable>
{
   private HiveMapFunction mapFunc;
-  private String name = "MapTran";
 
   public MapTran() {
-    this(false);
+    this(false, "MapTran");
   }
 
-  public MapTran(boolean cache) {
-    super(cache);
+  public MapTran(boolean cache, String name) {
+    super(cache, name);
   }
 
   @Override
@@ -43,14 +42,4 @@ public class MapTran extends CacheTran<BytesWritable, BytesWritable,
HiveKey, By
   public void setMapFunction(HiveMapFunction mapFunc) {
     this.mapFunc = mapFunc;
   }
-
-  @Override
-  public String getName() {
-    return name;
-  }
-
-  @Override
-  public void setName(String name) {
-    this.name = name;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/1e74aca8/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
index 9045e05..3b34c78 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
@@ -24,14 +24,13 @@ import org.apache.spark.api.java.JavaPairRDD;
 
 public class ReduceTran<V> extends CacheTran<HiveKey, V, HiveKey, BytesWritable>
{
   private HiveReduceFunction<V> reduceFunc;
-  private String name = "Reduce";
 
   public ReduceTran() {
-    this(false);
+    this(false, "Reduce");
   }
 
-  public ReduceTran(boolean caching) {
-    super(caching);
+  public ReduceTran(boolean caching, String name) {
+    super(caching, name);
   }
 
   @Override
@@ -43,14 +42,4 @@ public class ReduceTran<V> extends CacheTran<HiveKey, V, HiveKey,
BytesWritable>
   public void setReduceFunction(HiveReduceFunction<V> redFunc) {
     this.reduceFunc = redFunc;
   }
-
-  @Override
-  public String getName() {
-    return name;
-  }
-
-  @Override
-  public void setName(String name) {
-    this.name = name;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/1e74aca8/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
index aec96bc..40ff01a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.exec.spark;
 
 import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.storage.StorageLevel;
@@ -28,17 +29,21 @@ public class ShuffleTran implements SparkTran<HiveKey, BytesWritable,
HiveKey, B
   private final int numOfPartitions;
   private final boolean toCache;
   private final SparkPlan sparkPlan;
-  private String name = "Shuffle";
+  private final String name;
+  private final SparkEdgeProperty edge;
 
   public ShuffleTran(SparkPlan sparkPlan, SparkShuffler sf, int n) {
-    this(sparkPlan, sf, n, false);
+    this(sparkPlan, sf, n, false, "Shuffle", null);
   }
 
-  public ShuffleTran(SparkPlan sparkPlan, SparkShuffler sf, int n, boolean toCache) {
+  public ShuffleTran(SparkPlan sparkPlan, SparkShuffler sf, int n, boolean toCache, String
name,
+                     SparkEdgeProperty edge) {
     shuffler = sf;
     numOfPartitions = n;
     this.toCache = toCache;
     this.sparkPlan = sparkPlan;
+    this.name = name;
+    this.edge = edge;
   }
 
   @Override
@@ -48,7 +53,8 @@ public class ShuffleTran implements SparkTran<HiveKey, BytesWritable,
HiveKey, B
       sparkPlan.addCachedRDDId(result.id());
       result = result.persist(StorageLevel.MEMORY_AND_DISK());
     }
-    return result;
+    return result.setName(this.name + " (" + edge.getShuffleType() + ", " + numOfPartitions
+
+            (toCache ? ", cached)" : ")"));
   }
 
   public int getNoOfPartitions() {
@@ -65,11 +71,6 @@ public class ShuffleTran implements SparkTran<HiveKey, BytesWritable,
HiveKey, B
     return new Boolean(toCache);
   }
 
-  @Override
-  public void setName(String name) {
-    this.name = name;
-  }
-
   public SparkShuffler getShuffler() {
     return shuffler;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/1e74aca8/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
index 5d27692..b21e386 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
@@ -26,6 +26,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.spark.SparkContext;
+import org.apache.spark.util.CallSite;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.ql.io.HiveKey;
@@ -48,6 +50,12 @@ public class SparkPlan {
   private final Map<SparkTran, List<SparkTran>> invertedTransGraph = new HashMap<SparkTran,
List<SparkTran>>();
   private final Set<Integer> cachedRDDIds = new HashSet<Integer>();
 
+  private final SparkContext sc;
+
+  SparkPlan(SparkContext sc) {
+    this.sc = sc;
+  }
+
   @SuppressWarnings("unchecked")
   public JavaPairRDD<HiveKey, BytesWritable> generateGraph() {
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_BUILD_RDD_GRAPH);
@@ -60,6 +68,7 @@ public class SparkPlan {
         // Root tran, it must be MapInput
         Preconditions.checkArgument(tran instanceof MapInput,
             "AssertionError: tran must be an instance of MapInput");
+        sc.setCallSite(CallSite.apply(tran.getName(), ""));
         rdd = tran.transform(null);
       } else {
         for (SparkTran parent : parents) {
@@ -67,174 +76,37 @@ public class SparkPlan {
           if (rdd == null) {
             rdd = prevRDD;
           } else {
+            sc.setCallSite(CallSite.apply("UnionRDD (" + rdd.name() + ", " +
+                            prevRDD.name() + ")", ""));
             rdd = rdd.union(prevRDD);
+            rdd.setName("UnionRDD (" + rdd.getNumPartitions() + ")");
           }
         }
+        sc.setCallSite(CallSite.apply(tran.getName(), ""));
         rdd = tran.transform(rdd);
       }
 
       tranToOutputRDDMap.put(tran, rdd);
     }
 
-    logSparkPlan();
-
     JavaPairRDD<HiveKey, BytesWritable> finalRDD = null;
     for (SparkTran leafTran : leafTrans) {
       JavaPairRDD<HiveKey, BytesWritable> rdd = tranToOutputRDDMap.get(leafTran);
       if (finalRDD == null) {
         finalRDD = rdd;
       } else {
+        sc.setCallSite(CallSite.apply("UnionRDD (" + rdd.name() + ", " + finalRDD.name()
+ ")",
+                ""));
         finalRDD = finalRDD.union(rdd);
+        finalRDD.setName("UnionRDD (" + finalRDD.getNumPartitions() + ")");
       }
     }
 
     perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_BUILD_RDD_GRAPH);
-    if (LOG.isDebugEnabled()) {
-      LOG.info("print generated spark rdd graph:\n" + SparkUtilities.rddGraphToString(finalRDD));
-    }
-    return finalRDD;
-  }
-
-  private void addNumberToTrans() {
-    int i = 1;
-    String name = null;
-
-    // Traverse leafTran & transGraph add numbers to trans
-    for (SparkTran leaf : leafTrans) {
-      name = leaf.getName() + " " + i++;
-      leaf.setName(name);
-    }
-    Set<SparkTran> sparkTrans = transGraph.keySet();
-    for (SparkTran tran : sparkTrans) {
-      name = tran.getName() + " " + i++;
-      tran.setName(name);
-    }
-  }
-
-  private void logSparkPlan() {
-    addNumberToTrans();
-    ArrayList<SparkTran> leafTran = new ArrayList<SparkTran>();
-    leafTran.addAll(leafTrans);
-
-    for (SparkTran leaf : leafTrans) {
-      collectLeafTrans(leaf, leafTran);
-    }
-
-    // Start Traverse from the leafTrans and get parents of each leafTrans till
-    // the end
-    StringBuilder sparkPlan = new StringBuilder(
-      "\n\t!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! Spark Plan !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! \n\n");
-    for (SparkTran leaf : leafTran) {
-      sparkPlan.append(leaf.getName());
-      getSparkPlan(leaf, sparkPlan);
-      sparkPlan.append("\n");
-    }
-    sparkPlan
-      .append(" \n\t!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! Spark Plan !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
");
-    LOG.info(sparkPlan.toString());
-  }
-
-  private void collectLeafTrans(SparkTran leaf, List<SparkTran> reduceTrans) {
-    List<SparkTran> parents = getParents(leaf);
-    if (parents.size() > 0) {
-      SparkTran nextLeaf = null;
-      for (SparkTran leafTran : parents) {
-        if (leafTran instanceof ReduceTran) {
-          reduceTrans.add(leafTran);
-        } else {
-          if (getParents(leafTran).size() > 0)
-            nextLeaf = leafTran;
-        }
-      }
-      if (nextLeaf != null)
-        collectLeafTrans(nextLeaf, reduceTrans);
-    }
-  }
-
-  private void getSparkPlan(SparkTran tran, StringBuilder sparkPlan) {
-    List<SparkTran> parents = getParents(tran);
-    List<SparkTran> nextLeaf = new ArrayList<SparkTran>();
-    if (parents.size() > 0) {
-      sparkPlan.append(" <-- ");
-      boolean isFirst = true;
-      for (SparkTran leaf : parents) {
-        if (isFirst) {
-          sparkPlan.append("( " + leaf.getName());
-          if (leaf instanceof ShuffleTran) {
-            logShuffleTranStatus((ShuffleTran) leaf, sparkPlan);
-          } else {
-            logCacheStatus(leaf, sparkPlan);
-          }
-          isFirst = false;
-        } else {
-          sparkPlan.append("," + leaf.getName());
-          if (leaf instanceof ShuffleTran) {
-            logShuffleTranStatus((ShuffleTran) leaf, sparkPlan);
-          } else {
-            logCacheStatus(leaf, sparkPlan);
-          }
-        }
-        // Leave reduceTran it will be expanded in the next line
-        if (getParents(leaf).size() > 0 && !(leaf instanceof ReduceTran)) {
-          nextLeaf.add(leaf);
-        }
-      }
-      sparkPlan.append(" ) ");
-      if (nextLeaf.size() > 1) {
-        logLeafTran(nextLeaf, sparkPlan);
-      } else {
-        if (nextLeaf.size() != 0)
-          getSparkPlan(nextLeaf.get(0), sparkPlan);
-      }
-    }
-  }
-
-  private void logLeafTran(List<SparkTran> parent, StringBuilder sparkPlan) {
-    sparkPlan.append(" <-- ");
-    boolean isFirst = true;
-    for (SparkTran sparkTran : parent) {
-      List<SparkTran> parents = getParents(sparkTran);
-      SparkTran leaf = parents.get(0);
-      if (isFirst) {
-        sparkPlan.append("( " + leaf.getName());
-        if (leaf instanceof ShuffleTran) {
-          logShuffleTranStatus((ShuffleTran) leaf, sparkPlan);
-        } else {
-          logCacheStatus(leaf, sparkPlan);
-        }
-        isFirst = false;
-      } else {
-        sparkPlan.append("," + leaf.getName());
-        if (leaf instanceof ShuffleTran) {
-          logShuffleTranStatus((ShuffleTran) leaf, sparkPlan);
-        } else {
-          logCacheStatus(leaf, sparkPlan);
-        }
-      }
-    }
-    sparkPlan.append(" ) ");
-  }
 
-  private void logShuffleTranStatus(ShuffleTran leaf, StringBuilder sparkPlan) {
-    int noOfPartitions = leaf.getNoOfPartitions();
-    sparkPlan.append(" ( Partitions " + noOfPartitions);
-    SparkShuffler shuffler = leaf.getShuffler();
-    sparkPlan.append(", " + shuffler.getName());
-    if (leaf.isCacheEnable()) {
-      sparkPlan.append(", Cache on");
-    } else {
-      sparkPlan.append(", Cache off");
-    }
-  }
+    LOG.info("\n\nSpark RDD Graph:\n\n" + finalRDD.toDebugString() + "\n");
 
-  private void logCacheStatus(SparkTran sparkTran, StringBuilder sparkPlan) {
-    if (sparkTran.isCacheEnable() != null) {
-      if (sparkTran.isCacheEnable().booleanValue()) {
-        sparkPlan.append(" (cache on) ");
-      } else {
-        sparkPlan.append(" (cache off) ");
-      }
-    }
+    return finalRDD;
   }
 
   public void addTran(SparkTran tran) {

http://git-wip-us.apache.org/repos/asf/hive/blob/1e74aca8/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
index c52692d..c9a3196 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
@@ -23,7 +23,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
+import org.apache.spark.util.CallSite;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,6 +42,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
@@ -51,6 +54,7 @@ import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
 import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty;
 import org.apache.hadoop.hive.ql.plan.SparkWork;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
 import org.apache.hadoop.hive.ql.stats.StatsCollectionContext;
 import org.apache.hadoop.hive.ql.stats.StatsFactory;
 import org.apache.hadoop.hive.ql.stats.StatsPublisher;
@@ -98,7 +102,7 @@ public class SparkPlanGenerator {
 
   public SparkPlan generate(SparkWork sparkWork) throws Exception {
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_BUILD_PLAN);
-    SparkPlan sparkPlan = new SparkPlan();
+    SparkPlan sparkPlan = new SparkPlan(this.sc.sc());
     cloneToWork = sparkWork.getCloneToWork();
     workToTranMap.clear();
     workToParentWorkTranMap.clear();
@@ -138,9 +142,10 @@ public class SparkPlanGenerator {
       result = generateMapInput(sparkPlan, (MapWork)work);
       sparkPlan.addTran(result);
     } else if (work instanceof ReduceWork) {
+      boolean toCache = cloneToWork.containsKey(work);
       List<BaseWork> parentWorks = sparkWork.getParents(work);
-      result = generate(sparkPlan,
-        sparkWork.getEdgeProperty(parentWorks.get(0), work), cloneToWork.containsKey(work));
+      SparkEdgeProperty sparkEdgeProperty = sparkWork.getEdgeProperty(parentWorks.get(0),
work);
+      result = generate(sparkPlan, sparkEdgeProperty, toCache, work.getName());
       sparkPlan.addTran(result);
       for (BaseWork parentWork : parentWorks) {
         sparkPlan.connect(workToTranMap.get(parentWork), result);
@@ -189,6 +194,8 @@ public class SparkPlanGenerator {
     JobConf jobConf = cloneJobConf(mapWork);
     Class ifClass = getInputFormat(jobConf, mapWork);
 
+    sc.sc().setCallSite(CallSite.apply(mapWork.getName(), ""));
+
     JavaPairRDD<WritableComparable, Writable> hadoopRDD;
     if (mapWork.getNumMapTasks() != null) {
       jobConf.setNumMapTasks(mapWork.getNumMapTasks());
@@ -198,12 +205,24 @@ public class SparkPlanGenerator {
       hadoopRDD = sc.hadoopRDD(jobConf, ifClass, WritableComparable.class, Writable.class);
     }
 
+    boolean toCache = false/*cloneToWork.containsKey(mapWork)*/;
+
+    String tables = mapWork.getAllRootOperators().stream()
+            .filter(op -> op instanceof TableScanOperator)
+            .map(ts -> ((TableScanDesc) ts.getConf()).getAlias())
+            .collect(Collectors.joining(", "));
+
+    String rddName = mapWork.getName() + " (" + tables + ", " + hadoopRDD.getNumPartitions()
+
+            (toCache ? ", cached)" : ")");
+
     // Caching is disabled for MapInput due to HIVE-8920
-    MapInput result = new MapInput(sparkPlan, hadoopRDD, false/*cloneToWork.containsKey(mapWork)*/);
+    MapInput result = new MapInput(sparkPlan, hadoopRDD, toCache, rddName);
     return result;
   }
 
-  private ShuffleTran generate(SparkPlan sparkPlan, SparkEdgeProperty edge, boolean toCache)
{
+  private ShuffleTran generate(SparkPlan sparkPlan, SparkEdgeProperty edge, boolean toCache,
+                               String name) {
+
     Preconditions.checkArgument(!edge.isShuffleNone(),
         "AssertionError: SHUFFLE_NONE should only be used for UnionWork.");
     SparkShuffler shuffler;
@@ -214,7 +233,7 @@ public class SparkPlanGenerator {
     } else {
       shuffler = new GroupByShuffler();
     }
-    return new ShuffleTran(sparkPlan, shuffler, edge.getNumPartitions(), toCache);
+    return new ShuffleTran(sparkPlan, shuffler, edge.getNumPartitions(), toCache, name, edge);
   }
 
   private SparkTran generate(BaseWork work, SparkWork sparkWork) throws Exception {
@@ -238,12 +257,12 @@ public class SparkPlanGenerator {
               "Can't make path " + outputPath + " : " + e.getMessage());
         }
       }
-      MapTran mapTran = new MapTran(caching);
+      MapTran mapTran = new MapTran(caching, work.getName());
       HiveMapFunction mapFunc = new HiveMapFunction(confBytes, sparkReporter);
       mapTran.setMapFunction(mapFunc);
       return mapTran;
     } else if (work instanceof ReduceWork) {
-      ReduceTran reduceTran = new ReduceTran(caching);
+      ReduceTran reduceTran = new ReduceTran(caching, work.getName());
       HiveReduceFunction reduceFunc = new HiveReduceFunction(confBytes, sparkReporter);
       reduceTran.setReduceFunction(reduceFunc);
       return reduceTran;

http://git-wip-us.apache.org/repos/asf/hive/blob/1e74aca8/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java
index 037efe1..f9057b9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java
@@ -28,7 +28,5 @@ public interface SparkTran<KI extends WritableComparable, VI, KO extends
Writabl
 
   public String getName();
 
-  public void setName(String name);
-
   public Boolean isCacheEnable();
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/1e74aca8/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
index f332790..943a4ee 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
@@ -50,12 +50,8 @@ import org.apache.hadoop.hive.ql.plan.SparkWork;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hive.spark.client.SparkClientUtilities;
-import org.apache.spark.Dependency;
 import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.rdd.RDD;
-import org.apache.spark.rdd.UnionRDD;
-import scala.collection.JavaConversions;
+
 
 /**
  * Contains utilities methods used as part of Spark tasks.
@@ -138,36 +134,6 @@ public class SparkUtilities {
     return sparkSession;
   }
 
-
-  public static String rddGraphToString(JavaPairRDD rdd) {
-    StringBuilder sb = new StringBuilder();
-    rddToString(rdd.rdd(), sb, "");
-    return sb.toString();
-  }
-
-  private static void rddToString(RDD rdd, StringBuilder sb, String offset) {
-    sb.append(offset).append(rdd.getClass().getCanonicalName()).append("[").append(rdd.hashCode()).append("]");
-    if (rdd.getStorageLevel().useMemory()) {
-      sb.append("(cached)");
-    }
-    sb.append("\n");
-    Collection<Dependency> dependencies = JavaConversions.asJavaCollection(rdd.dependencies());
-    if (dependencies != null) {
-      offset += "\t";
-      for (Dependency dependency : dependencies) {
-        RDD parentRdd = dependency.rdd();
-        rddToString(parentRdd, sb, offset);
-      }
-    } else if (rdd instanceof UnionRDD) {
-      UnionRDD unionRDD = (UnionRDD) rdd;
-      offset += "\t";
-      Collection<RDD> parentRdds = JavaConversions.asJavaCollection(unionRDD.rdds());
-      for (RDD parentRdd : parentRdds) {
-        rddToString(parentRdd, sb, offset);
-      }
-    }
-  }
-
   /**
    * Generate a temporary path for dynamic partition pruning in Spark branch
    * TODO: no longer need this if we use accumulator!

http://git-wip-us.apache.org/repos/asf/hive/blob/1e74aca8/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
index b698987..1622ae2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
@@ -415,7 +415,7 @@ public class CombineHiveInputFormat<K extends WritableComparable, V
extends Writ
           combine.createPool(job, f);
           poolMap.put(combinePathInputFormat, f);
         } else {
-          LOG.info("CombineHiveInputSplit: pool is already created for " + path +
+          LOG.debug("CombineHiveInputSplit: pool is already created for " + path +
                    "; using filter path " + filterPath);
           f.addPath(filterPath);
         }


Mime
View raw message