hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ga...@apache.org
Subject [07/50] [abbrv] hive git commit: HIVE-8858: Visualize generated Spark plan [Spark Branch] (Chinna via Jimmy)
Date Fri, 05 Jun 2015 23:16:20 GMT
HIVE-8858: Visualize generated Spark plan [Spark Branch] (Chinna via Jimmy)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/eb48ffdb
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/eb48ffdb
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/eb48ffdb

Branch: refs/heads/hbase-metastore
Commit: eb48ffdbfc0690480e2aeb53f03e249ad6091378
Parents: 6f00507
Author: Jimmy Xiang <jxiang@cloudera.com>
Authored: Wed Apr 22 14:51:35 2015 -0700
Committer: Xuefu Zhang <xzhang@Cloudera.com>
Committed: Mon Jun 1 14:02:18 2015 -0700

----------------------------------------------------------------------
 .../hadoop/hive/ql/exec/spark/MapInput.java     | 10 ++++
 .../hadoop/hive/ql/exec/spark/MapTran.java      | 10 ++++
 .../hadoop/hive/ql/exec/spark/ReduceTran.java   | 11 ++++
 .../hadoop/hive/ql/exec/spark/ShuffleTran.java  | 10 ++++
 .../hadoop/hive/ql/exec/spark/SparkPlan.java    | 58 ++++++++++++++++++++
 .../hadoop/hive/ql/exec/spark/SparkTran.java    |  4 ++
 6 files changed, 103 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/eb48ffdb/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java
index 8d18885..157e4d8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java
@@ -85,4 +85,14 @@ public class MapInput implements SparkTran<WritableComparable, Writable,
     }
 
   }
+
+  @Override
+  public String getName() {
+    return "MapInput";
+  }
+
+  @Override
+  public Boolean isCacheEnable() {
+    return new Boolean(toCache);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/eb48ffdb/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java
index 638c387..f6a4d77 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java
@@ -35,4 +35,14 @@ public class MapTran implements SparkTran<BytesWritable, BytesWritable,
HiveKey,
     this.mapFunc = mapFunc;
   }
 
+  @Override
+  public String getName() {
+    return "MapTran";
+  }
+
+  @Override
+  public Boolean isCacheEnable() {
+    // TODO Auto-generated method stub
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/eb48ffdb/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
index dbc614b..fd6b31c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
@@ -34,4 +34,15 @@ public class ReduceTran implements SparkTran<HiveKey, Iterable<BytesWritable>,
H
   public void setReduceFunction(HiveReduceFunction redFunc) {
     this.reduceFunc = redFunc;
   }
+
+  @Override
+  public String getName() {
+    return "Reduce";
+  }
+
+  @Override
+  public Boolean isCacheEnable() {
+    // TODO Auto-generated method stub
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/eb48ffdb/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
index 4a597ee..6cdab20 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
@@ -49,4 +49,14 @@ public class ShuffleTran implements SparkTran<HiveKey, BytesWritable,
HiveKey, I
     }
     return result;
   }
+
+  @Override
+  public String getName() {
+    return "Shuffle";
+  }
+
+  @Override
+  public Boolean isCacheEnable() {
+    return new Boolean(toCache);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/eb48ffdb/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
index b45494d..81b7e85 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
@@ -26,6 +26,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.io.BytesWritable;
@@ -36,6 +38,7 @@ import com.google.common.base.Preconditions;
 @SuppressWarnings("rawtypes")
 public class SparkPlan {
   private static final String CLASS_NAME = SparkPlan.class.getName();
+  private static final Log LOG = LogFactory.getLog(SparkPlan.class);
   private final PerfLogger perfLogger = PerfLogger.getPerfLogger();
 
   private final Set<SparkTran> rootTrans = new HashSet<SparkTran>();
@@ -72,6 +75,8 @@ public class SparkPlan {
       tranToOutputRDDMap.put(tran, rdd);
     }
 
+    logSparkPlan();
+
     JavaPairRDD<HiveKey, BytesWritable> finalRDD = null;
     for (SparkTran leafTran : leafTrans) {
       JavaPairRDD<HiveKey, BytesWritable> rdd = tranToOutputRDDMap.get(leafTran);
@@ -86,6 +91,59 @@ public class SparkPlan {
     return finalRDD;
   }
 
+  private void logSparkPlan() {
+    LOG.info("------------------------------ Spark Plan -----------------------------");
+    Set<SparkTran> keySet = invertedTransGraph.keySet();
+    for (SparkTran sparkTran : keySet) {
+      if (sparkTran instanceof ReduceTran) {
+	String sparkPlan = "	" + sparkTran.getName();
+	sparkPlan = getSparkPlan(sparkTran, sparkPlan);
+	LOG.info(sparkPlan);
+      }
+    }
+    LOG.info("------------------------------ Spark Plan -----------------------------");
+  }
+
+  private String getSparkPlan(SparkTran leaf, String sparkPlanMsg) {
+    if (leaf != null) {
+      List<SparkTran> parents = getParents(leaf);
+      if (parents.size() > 0) {
+	sparkPlanMsg = sparkPlanMsg + " <-- ";
+	boolean isFirst = true;
+	SparkTran parent = null;
+	for (SparkTran sparkTran : parents) {
+	  if (isFirst) {
+	    sparkPlanMsg = sparkPlanMsg + "( " + sparkTran.getName();
+	    sparkPlanMsg = logCacheStatus(sparkPlanMsg, sparkTran);
+	    isFirst = false;
+	  } else {
+	    sparkPlanMsg = sparkPlanMsg + "," + sparkTran.getName();
+	    sparkPlanMsg = logCacheStatus(sparkPlanMsg, sparkTran);
+	  }
+	  if (getParents(sparkTran).size() > 0 && !(sparkTran instanceof ReduceTran))
{
+	    parent = sparkTran;
+	  }
+	}
+	sparkPlanMsg = sparkPlanMsg + " ) ";
+	return getSparkPlan(parent, sparkPlanMsg);
+      } else {
+	return sparkPlanMsg;
+      }
+    }
+    return sparkPlanMsg;
+  }
+
+  private String logCacheStatus(String sparkPlanMsg, SparkTran sparkTran) {
+    if (sparkTran.isCacheEnable() != null) {
+      if (sparkTran.isCacheEnable().booleanValue()) {
+	sparkPlanMsg = sparkPlanMsg + " (cache on) ";
+      } else {
+	sparkPlanMsg = sparkPlanMsg + " (cache off) ";
+      }
+    }
+    return sparkPlanMsg;
+  }
+
   public void addTran(SparkTran tran) {
     rootTrans.add(tran);
     leafTrans.add(tran);

http://git-wip-us.apache.org/repos/asf/hive/blob/eb48ffdb/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java
index 4daa61e..c3c48a0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java
@@ -25,4 +25,8 @@ import org.apache.spark.api.java.JavaPairRDD;
 public interface SparkTran<KI extends WritableComparable, VI, KO extends WritableComparable,
VO> {
   JavaPairRDD<KO, VO> transform(
       JavaPairRDD<KI, VI> input);
+
+  public String getName();
+
+  public Boolean isCacheEnable();
 }


Mime
View raw message