hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xu...@apache.org
Subject svn commit: r1630642 - in /hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark: MapInput.java SparkPlan.java SparkPlanGenerator.java
Date Fri, 10 Oct 2014 04:51:29 GMT
Author: xuefu
Date: Fri Oct 10 04:51:29 2014
New Revision: 1630642

URL: http://svn.apache.org/r1630642
Log:
HIVE-8275: Introduce MapInput encapsulating a Hadoop RDD [Spark Branch]

Added:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java
Modified:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java

Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java?rev=1630642&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java Fri
Oct 10 04:51:29 2014
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.spark.api.java.JavaPairRDD;
+
+import com.google.common.base.Preconditions;
+
+public class MapInput implements SparkTran<BytesWritable, HiveKey> {
+  private JavaPairRDD<HiveKey, BytesWritable> hadoopRDD;
+  private boolean toCache;
+
+  public MapInput(JavaPairRDD<HiveKey, BytesWritable> hadoopRDD) {
+    this.hadoopRDD = hadoopRDD;
+  }
+
+  public MapInput(JavaPairRDD<HiveKey, BytesWritable> hadoopRDD, boolean toCache) {
+    this.hadoopRDD = hadoopRDD;
+    this.toCache = toCache;
+  }
+
+  public void setToCache(boolean toCache) {
+    this.toCache = toCache;
+  }
+
+  @Override
+  public JavaPairRDD<HiveKey, BytesWritable> transform(
+      JavaPairRDD<BytesWritable, BytesWritable> input) {
+    Preconditions.checkArgument(input == null,
+        "AssertionError: MapInput doesn't take any input");
+    return toCache ? hadoopRDD.cache() : hadoopRDD;
+  }
+
+}

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java?rev=1630642&r1=1630641&r2=1630642&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java Fri
Oct 10 04:51:29 2014
@@ -22,6 +22,9 @@ import org.apache.hadoop.hive.ql.io.Hive
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.spark.api.java.JavaPairRDD;
 
+import com.google.common.base.Preconditions;
+
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -30,56 +33,55 @@ import java.util.Map;
 import java.util.Set;
 
 public class SparkPlan {
-
+  private final Set<SparkTran> rootTrans = new HashSet<SparkTran>();
   private final Set<SparkTran> leafTrans = new HashSet<SparkTran>();
   private final Map<SparkTran, List<SparkTran>> transGraph = new HashMap<SparkTran,
List<SparkTran>>();
   private final Map<SparkTran, List<SparkTran>> invertedTransGraph = new HashMap<SparkTran,
List<SparkTran>>();
-  private final Map<SparkTran, JavaPairRDD<HiveKey, BytesWritable>> mapInputs
=
-      new HashMap<SparkTran, JavaPairRDD<HiveKey, BytesWritable>>();
-
-  public void addInput(SparkTran tran, JavaPairRDD<HiveKey, BytesWritable> input) {
-    if (!mapInputs.containsKey(tran)) {
-      mapInputs.put(tran, input);
-      leafTrans.add(tran);
-      transGraph.put(tran, new LinkedList<SparkTran>());
-      invertedTransGraph.put(tran, new LinkedList<SparkTran>());
-    }
-  }
 
   public void execute() throws IllegalStateException {
-    Map<SparkTran, JavaPairRDD<HiveKey, BytesWritable>> tranToRDDMap
+    Map<SparkTran, JavaPairRDD<HiveKey, BytesWritable>> tranToOutputRDDMap
         = new HashMap<SparkTran, JavaPairRDD<HiveKey, BytesWritable>>();
     for (SparkTran tran : getAllTrans()) {
       JavaPairRDD<HiveKey, BytesWritable> rdd = null;
-      if (mapInputs.containsKey(tran)) {
-        rdd = mapInputs.get(tran);
+      List<SparkTran> parents = getParents(tran);
+      if (parents.size() == 0) {
+        // Root tran, it must be MapInput
+        Preconditions.checkArgument(tran instanceof MapInput,
+            "AssertionError: tran must be an instance of MapInput");
+        rdd = tran.transform(null);
       } else {
-        // a non-root tran, it must have a previous input
-        for (SparkTran parentTran : getParents(tran)) {
-          JavaPairRDD<HiveKey, BytesWritable> prevRDD = tranToRDDMap.get(parentTran);
+        for (SparkTran parent : parents) {
+          JavaPairRDD<HiveKey, BytesWritable> prevRDD = tranToOutputRDDMap.get(parent);
           if (rdd == null) {
             rdd = prevRDD;
           } else {
             rdd = rdd.union(prevRDD);
           }
         }
+        rdd = tran.transform(rdd);
       }
-      rdd = tran.transform(rdd);
-      tranToRDDMap.put(tran, rdd);
+
+      tranToOutputRDDMap.put(tran, rdd);
     }
 
     JavaPairRDD<HiveKey, BytesWritable> finalRDD = null;
     for (SparkTran leafTran : leafTrans) {
-      JavaPairRDD<HiveKey, BytesWritable> rdd = tranToRDDMap.get(leafTran);
+      JavaPairRDD<HiveKey, BytesWritable> rdd = tranToOutputRDDMap.get(leafTran);
       if (finalRDD == null) {
         finalRDD = rdd;
       } else {
         finalRDD = finalRDD.union(rdd);
       }
     }
+
     finalRDD.foreach(HiveVoidFunction.getInstance());
   }
 
+  public void addTran(SparkTran tran) {
+    rootTrans.add(tran);
+    leafTrans.add(tran);
+  }
+
   /**
    * This method returns a topologically sorted list of SparkTran
    */
@@ -122,10 +124,10 @@ public class SparkPlan {
     if (getChildren(parent).contains(child)) {
       throw new IllegalStateException("Connection already exists");
     }
+    rootTrans.remove(child);
     leafTrans.remove(parent);
-    leafTrans.add(child);
-    if (transGraph.get(child) == null) {
-      transGraph.put(child, new LinkedList<SparkTran>());
+    if (transGraph.get(parent) == null) {
+      transGraph.put(parent, new LinkedList<SparkTran>());
     }
     if (invertedTransGraph.get(child) == null) {
       invertedTransGraph.put(child, new LinkedList<SparkTran>());
@@ -135,18 +137,19 @@ public class SparkPlan {
   }
 
   public List<SparkTran> getParents(SparkTran tran) throws IllegalStateException {
-    if (!invertedTransGraph.containsKey(tran)
-        || invertedTransGraph.get(tran) == null) {
-      throw new IllegalStateException("Cannot get parent transformations for " + tran);
+    if (!invertedTransGraph.containsKey(tran)) {
+      return new ArrayList<SparkTran>();
     }
-    return new LinkedList<SparkTran>(invertedTransGraph.get(tran));
+
+    return invertedTransGraph.get(tran);
   }
 
   public List<SparkTran> getChildren(SparkTran tran) throws IllegalStateException {
-    if (!transGraph.containsKey(tran) || transGraph.get(tran) == null) {
-      throw new IllegalStateException("Cannot get children transformations for " + tran);
+    if (!transGraph.containsKey(tran)) {
+      return new ArrayList<SparkTran>();
     }
-    return new LinkedList<SparkTran>(transGraph.get(tran));
+
+    return transGraph.get(tran);
   }
 
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java?rev=1630642&r1=1630641&r2=1630642&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
(original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
Fri Oct 10 04:51:29 2014
@@ -74,27 +74,30 @@ public class SparkPlanGenerator {
   }
 
   public SparkPlan generate(SparkWork sparkWork) throws Exception {
-    SparkPlan result = new SparkPlan();
-    Map<BaseWork, SparkTran> createdTransMap = new HashMap<BaseWork, SparkTran>();
+    SparkPlan sparkPlan = new SparkPlan();
+    Map<BaseWork, SparkTran> workToTranMap = new HashMap<BaseWork, SparkTran>();
 
     for (BaseWork work : sparkWork.getAllWork()) {
       SparkTran tran;
       if (work instanceof MapWork) {
-        JavaPairRDD<HiveKey, BytesWritable> inputRDD = generateRDD((MapWork)work);
+        MapInput mapInput = generateMapInput((MapWork)work);
+        sparkPlan.addTran(mapInput);
         tran = generate(work, null);
-        result.addInput(tran, inputRDD);
+        sparkPlan.addTran(tran);
+        sparkPlan.connect(mapInput, tran);
       } else {
         List<BaseWork> parentWorks = sparkWork.getParents(work);
         tran = generate(work, sparkWork.getEdgeProperty(parentWorks.get(0), work));
+        sparkPlan.addTran(tran);
         for (BaseWork parentWork : parentWorks) {
-          SparkTran parentTran = createdTransMap.get(parentWork);
-          result.connect(parentTran, tran);
+          SparkTran parentTran = workToTranMap.get(parentWork);
+          sparkPlan.connect(parentTran, tran);
         }
       }
-      createdTransMap.put(work, tran);
+      workToTranMap.put(work, tran);
     }
 
-    return result;
+    return sparkPlan;
   }
 
   private Class getInputFormat(JobConf jobConf, MapWork mWork) throws HiveException {
@@ -144,13 +147,14 @@ public class SparkPlanGenerator {
     }
   }
 
-  private JavaPairRDD<HiveKey, BytesWritable> generateRDD(MapWork mapWork)
+  private MapInput generateMapInput(MapWork mapWork)
       throws Exception {
     JobConf jobConf = cloneJobConf(mapWork);
     Class ifClass = getInputFormat(jobConf, mapWork);
 
-    return sc.hadoopRDD(jobConf, ifClass, WritableComparable.class,
-        Writable.class);
+    JavaPairRDD<HiveKey, BytesWritable> hadoopRDD = sc.hadoopRDD(jobConf, ifClass,
+        WritableComparable.class, Writable.class);
+    return new MapInput(hadoopRDD);
   }
 
   private SparkShuffler generate(SparkEdgeProperty edge) {



Mime
View raw message