Return-Path: X-Original-To: apmail-hive-commits-archive@www.apache.org Delivered-To: apmail-hive-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 59542183FA for ; Tue, 9 Jun 2015 01:57:11 +0000 (UTC) Received: (qmail 92025 invoked by uid 500); 9 Jun 2015 01:57:09 -0000 Delivered-To: apmail-hive-commits-archive@hive.apache.org Received: (qmail 91974 invoked by uid 500); 9 Jun 2015 01:57:09 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 91839 invoked by uid 99); 9 Jun 2015 01:57:09 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 09 Jun 2015 01:57:09 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 42B07DFFD5; Tue, 9 Jun 2015 01:57:09 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: xuefu@apache.org To: commits@hive.apache.org Date: Tue, 09 Jun 2015 01:57:10 -0000 Message-Id: <2cdb685597f74677a8c6f0b7713b07f5@git.apache.org> In-Reply-To: <50807250f7aa4d21881bc5eed563a377@git.apache.org> References: <50807250f7aa4d21881bc5eed563a377@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [02/50] [abbrv] hive git commit: HIVE-10626 Spark paln need to be updated [Spark Branch] (Chinna via Jimmy) HIVE-10626 Spark paln need to be updated [Spark Branch] (Chinna via Jimmy) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2df14f9b Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2df14f9b Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2df14f9b Branch: refs/heads/spark Commit: 2df14f9b04dec968751f7b336bde7a5451af9d93 Parents: 81e8e66 Author: Jimmy Xiang Authored: Fri May 8 13:14:24 2015 -0700 Committer: Xuefu Zhang Committed: Mon Jun 1 14:03:42 2015 -0700 ---------------------------------------------------------------------- .../hive/ql/exec/spark/GroupByShuffler.java | 4 + .../hadoop/hive/ql/exec/spark/MapInput.java | 8 +- .../hadoop/hive/ql/exec/spark/MapTran.java | 9 +- .../hadoop/hive/ql/exec/spark/ReduceTran.java | 9 +- .../hadoop/hive/ql/exec/spark/ShuffleTran.java | 16 +- .../hive/ql/exec/spark/SortByShuffler.java | 5 + .../hadoop/hive/ql/exec/spark/SparkPlan.java | 165 ++++++++++++++----- .../hive/ql/exec/spark/SparkShuffler.java | 2 + .../hadoop/hive/ql/exec/spark/SparkTran.java | 2 + 9 files changed, 176 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/2df14f9b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java index b8e36cb..e128dd2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java @@ -33,4 +33,8 @@ public class GroupByShuffler implements SparkShuffler { return input.groupByKey(); } + @Override + public String getName() { + return "GroupBy"; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/2df14f9b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java index 157e4d8..26cfebd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java @@ -36,6 +36,7 @@ public class MapInput implements SparkTran hadoopRDD; private boolean toCache; private final SparkPlan sparkPlan; + private String name = "MapInput"; public MapInput(SparkPlan sparkPlan, JavaPairRDD hadoopRDD) { this(sparkPlan, hadoopRDD, false); @@ -88,11 +89,16 @@ public class MapInput implements SparkTran { private HiveMapFunction mapFunc; + private String name = "MapTran"; @Override public JavaPairRDD transform( @@ -37,12 +38,16 @@ public class MapTran implements SparkTran, HiveKey, BytesWritable> { private HiveReduceFunction reduceFunc; + private String name = "Reduce"; @Override public JavaPairRDD transform( @@ -37,12 +38,16 @@ public class ReduceTran implements SparkTran, H @Override public String getName() { - return "Reduce"; + return name; } @Override public Boolean isCacheEnable() { - // TODO Auto-generated method stub return null; } + + @Override + public void setName(String name) { + this.name = name; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/2df14f9b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java index 6cdab20..a774395 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java @@ -28,6 +28,7 @@ public class ShuffleTran implements SparkTran>, HiveKey, Iterable> { http://git-wip-us.apache.org/repos/asf/hive/blob/2df14f9b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java index 81b7e85..ee5c78a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java @@ -91,57 +91,146 @@ public class SparkPlan { return finalRDD; } + private void addNumberToTrans() { + int i = 1; + String name = null; + + // Traverse leafTran & transGraph add numbers to trans + for (SparkTran leaf : leafTrans) { + name = leaf.getName() + " " + i++; + leaf.setName(name); + } + Set sparkTrans = transGraph.keySet(); + for (SparkTran tran : sparkTrans) { + name = tran.getName() + " " + i++; + tran.setName(name); + } + } + private void logSparkPlan() { - LOG.info("------------------------------ Spark Plan -----------------------------"); - Set keySet = invertedTransGraph.keySet(); - for (SparkTran sparkTran : keySet) { - if (sparkTran instanceof ReduceTran) { - String sparkPlan = " " + sparkTran.getName(); - sparkPlan = getSparkPlan(sparkTran, sparkPlan); - LOG.info(sparkPlan); + addNumberToTrans(); + ArrayList leafTran = new ArrayList(); + leafTran.addAll(leafTrans); + + for (SparkTran leaf : leafTrans) { + collectLeafTrans(leaf, leafTran); + } + + // Start Traverse from the leafTrans and get parents of each leafTrans till + // the end + StringBuilder sparkPlan = new StringBuilder( + "\n\t!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! Spark Plan !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! \n\n"); + for (SparkTran leaf : leafTran) { + sparkPlan.append(leaf.getName()); + getSparkPlan(leaf, sparkPlan); + sparkPlan.append("\n"); + } + sparkPlan + .append(" \n\t!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! Spark Plan !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! "); + LOG.info(sparkPlan); + } + + private void collectLeafTrans(SparkTran leaf, List reduceTrans) { + List parents = getParents(leaf); + if (parents.size() > 0) { + SparkTran nextLeaf = null; + for (SparkTran leafTran : parents) { + if (leafTran instanceof ReduceTran) { + reduceTrans.add(leafTran); + } else { + if (getParents(leafTran).size() > 0) + nextLeaf = leafTran; + } } + if (nextLeaf != null) + collectLeafTrans(nextLeaf, reduceTrans); } - LOG.info("------------------------------ Spark Plan -----------------------------"); - } - - private String getSparkPlan(SparkTran leaf, String sparkPlanMsg) { - if (leaf != null) { - List parents = getParents(leaf); - if (parents.size() > 0) { - sparkPlanMsg = sparkPlanMsg + " <-- "; - boolean isFirst = true; - SparkTran parent = null; - for (SparkTran sparkTran : parents) { - if (isFirst) { - sparkPlanMsg = sparkPlanMsg + "( " + sparkTran.getName(); - sparkPlanMsg = logCacheStatus(sparkPlanMsg, sparkTran); - isFirst = false; - } else { - sparkPlanMsg = sparkPlanMsg + "," + sparkTran.getName(); - sparkPlanMsg = logCacheStatus(sparkPlanMsg, sparkTran); - } - if (getParents(sparkTran).size() > 0 && !(sparkTran instanceof ReduceTran)) { - parent = sparkTran; - } - } - sparkPlanMsg = sparkPlanMsg + " ) "; - return getSparkPlan(parent, sparkPlanMsg); + } + + private void getSparkPlan(SparkTran tran, StringBuilder sparkPlan) { + List parents = getParents(tran); + List nextLeaf = new ArrayList(); + if (parents.size() > 0) { + sparkPlan.append(" <-- "); + boolean isFirst = true; + for (SparkTran leaf : parents) { + if (isFirst) { + sparkPlan.append("( " + leaf.getName()); + if (leaf instanceof ShuffleTran) { + logShuffleTranStatus((ShuffleTran) leaf, sparkPlan); + } else { + logCacheStatus(leaf, sparkPlan); + } + isFirst = false; + } else { + sparkPlan.append("," + leaf.getName()); + if (leaf instanceof ShuffleTran) { + logShuffleTranStatus((ShuffleTran) leaf, sparkPlan); + } else { + logCacheStatus(leaf, sparkPlan); + } + } + // Leave reduceTran it will be expanded in the next line + if (getParents(leaf).size() > 0 && !(leaf instanceof ReduceTran)) { + nextLeaf.add(leaf); + } + } + sparkPlan.append(" ) "); + if (nextLeaf.size() > 1) { + logLeafTran(nextLeaf, sparkPlan); } else { - return sparkPlanMsg; + if (nextLeaf.size() != 0) + getSparkPlan(nextLeaf.get(0), sparkPlan); } } - return sparkPlanMsg; } - private String logCacheStatus(String sparkPlanMsg, SparkTran sparkTran) { + private void logLeafTran(List parent, StringBuilder sparkPlan) { + sparkPlan.append(" <-- "); + boolean isFirst = true; + for (SparkTran sparkTran : parent) { + List parents = getParents(sparkTran); + SparkTran leaf = parents.get(0); + if (isFirst) { + sparkPlan.append("( " + leaf.getName()); + if (leaf instanceof ShuffleTran) { + logShuffleTranStatus((ShuffleTran) leaf, sparkPlan); + } else { + logCacheStatus(leaf, sparkPlan); + } + isFirst = false; + } else { + sparkPlan.append("," + leaf.getName()); + if (leaf instanceof ShuffleTran) { + logShuffleTranStatus((ShuffleTran) leaf, sparkPlan); + } else { + logCacheStatus(leaf, sparkPlan); + } + } + } + sparkPlan.append(" ) "); + } + + private void logShuffleTranStatus(ShuffleTran leaf, StringBuilder sparkPlan) { + int noOfPartitions = leaf.getNoOfPartitions(); + sparkPlan.append(" ( Partitions " + noOfPartitions); + SparkShuffler shuffler = leaf.getShuffler(); + sparkPlan.append(", " + shuffler.getName()); + if (leaf.isCacheEnable()) { + sparkPlan.append(", Cache on"); + } else { + sparkPlan.append(", Cache off"); + } + } + + private void logCacheStatus(SparkTran sparkTran, StringBuilder sparkPlan) { if (sparkTran.isCacheEnable() != null) { if (sparkTran.isCacheEnable().booleanValue()) { - sparkPlanMsg = sparkPlanMsg + " (cache on) "; + sparkPlan.append(" (cache on) "); } else { - sparkPlanMsg = sparkPlanMsg + " (cache off) "; + sparkPlan.append(" (cache off) "); } } - return sparkPlanMsg; } public void addTran(SparkTran tran) { http://git-wip-us.apache.org/repos/asf/hive/blob/2df14f9b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java index 53845a0..40e251f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java @@ -27,4 +27,6 @@ public interface SparkShuffler { JavaPairRDD> shuffle( JavaPairRDD input, int numPartitions); + public String getName(); + } http://git-wip-us.apache.org/repos/asf/hive/blob/2df14f9b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java index c3c48a0..671c983 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java @@ -28,5 +28,7 @@ public interface SparkTran