hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xu...@apache.org
Subject svn commit: r1614825 - in /hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark: HiveMapFunction.java HiveReduceFunction.java ReduceTran.java SparkPlanGenerator.java
Date Thu, 31 Jul 2014 08:20:20 GMT
Author: xuefu
Date: Thu Jul 31 08:20:19 2014
New Revision: 1614825

URL: http://svn.apache.org/r1614825
Log:
HIVE-7526: Research to use groupby transformation to replace Hive existing partitionByKey
and SparkCollector combination [Spark Branch]

Modified:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java?rev=1614825&r1=1614824&r2=1614825&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
(original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
Thu Jul 31 08:20:19 2014
@@ -55,7 +55,6 @@ BytesWritable, BytesWritable> {
     collector.clear();
     while(it.hasNext() && !ExecMapper.getDone()) {
       Tuple2<BytesWritable, BytesWritable> input = it.next();
-      System.out.println("mapper input: " + input._1() + ", " + input._2());
       mapper.map(input._1(), input._2(), collector, Reporter.NULL);
     }
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java?rev=1614825&r1=1614824&r2=1614825&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
(original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
Thu Jul 31 08:20:19 2014
@@ -18,11 +18,7 @@
 
 package org.apache.hadoop.hive.ql.exec.spark;
 
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
 
 import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
 import org.apache.hadoop.io.BytesWritable;
@@ -32,7 +28,7 @@ import org.apache.spark.api.java.functio
 
 import scala.Tuple2;
 
-public class HiveReduceFunction implements PairFlatMapFunction<Iterator<Tuple2<BytesWritable,BytesWritable>>,
+public class HiveReduceFunction implements PairFlatMapFunction<Iterator<Tuple2<BytesWritable,Iterable<BytesWritable>>>,
 BytesWritable, BytesWritable> {
   private static final long serialVersionUID = 1L;
 
@@ -47,8 +43,8 @@ BytesWritable, BytesWritable> {
   }
 
   @Override
-  public Iterable<Tuple2<BytesWritable, BytesWritable>> call(Iterator<Tuple2<BytesWritable,BytesWritable>>
it)
-      throws Exception {
+  public Iterable<Tuple2<BytesWritable, BytesWritable>>
+  call(Iterator<Tuple2<BytesWritable,Iterable<BytesWritable>>> it) throws
Exception {
     if (jobConf == null) {
       jobConf = KryoSerializer.deserializeJobConf(this.buffer);
       jobConf.set("mapred.reducer.class", ExecReducer.class.getName());      
@@ -59,25 +55,9 @@ BytesWritable, BytesWritable> {
     }
 
     collector.clear();
-    Map<BytesWritable, List<BytesWritable>> clusteredRows = 
-        new HashMap<BytesWritable, List<BytesWritable>>();
     while (it.hasNext()) {
-      Tuple2<BytesWritable, BytesWritable> input = it.next();
-      BytesWritable key = input._1();
-      BytesWritable value = input._2();
-      System.out.println("reducer row: " + key + "/" + value);
-      // cluster the input according to key.
-      List<BytesWritable> valueList = clusteredRows.get(key);
-      if (valueList == null) {
-        valueList = new ArrayList<BytesWritable>();
-        clusteredRows.put(key, valueList);
-      }
-      valueList.add(value);
-    }
-
-    for (Map.Entry<BytesWritable, List<BytesWritable>> entry : clusteredRows.entrySet())
{
-      // pass on the clustered result to the reducer operator tree.
-      reducer.reduce(entry.getKey(), entry.getValue().iterator(), collector, Reporter.NULL);
+      Tuple2<BytesWritable, Iterable<BytesWritable>> tup = it.next();
+      reducer.reduce(tup._1(), tup._2().iterator(), collector, Reporter.NULL);
     }
 
     reducer.close();

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java?rev=1614825&r1=1614824&r2=1614825&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java Thu
Jul 31 08:20:19 2014
@@ -22,16 +22,21 @@ import org.apache.hadoop.io.BytesWritabl
 import org.apache.spark.api.java.JavaPairRDD;
 
 public class ReduceTran implements SparkTran {
+  private SparkShuffler shuffler;
   private HiveReduceFunction reduceFunc;
 
   @Override
   public JavaPairRDD<BytesWritable, BytesWritable> transform(
       JavaPairRDD<BytesWritable, BytesWritable> input) {
-    return input.mapPartitionsToPair(reduceFunc);
+    return shuffler.shuffle(input).mapPartitionsToPair(reduceFunc);
   }
 
   public void setReduceFunction(HiveReduceFunction redFunc) {
     this.reduceFunc = redFunc;
   }
 
+  public void setShuffler(SparkShuffler shuffler) {
+    this.shuffler = shuffler;
+  }
+
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java?rev=1614825&r1=1614824&r2=1614825&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
(original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
Thu Jul 31 08:20:19 2014
@@ -62,10 +62,12 @@ public class SparkPlanGenerator {
     MapWork mapWork = (MapWork) w;
     trans.add(generate(w));
     while (sparkWork.getChildren(w).size() > 0) {
-      BaseWork child = sparkWork.getChildren(w).get(0);
+      ReduceWork child = (ReduceWork) sparkWork.getChildren(w).get(0);
       SparkEdgeProperty edge = sparkWork.getEdgeProperty(w, child);
-      trans.add(generate(edge));
-      trans.add(generate(child));
+      SparkShuffler st = generate(edge);
+      ReduceTran rt = generate(child);
+      rt.setShuffler(st);
+      trans.add(rt);
       w = child;
     }
     ChainedTran chainedTran = new ChainedTran(trans);
@@ -107,9 +109,9 @@ public class SparkPlanGenerator {
     return result;
   }
 
-  private ShuffleTran generate(SparkEdgeProperty edge) {
-    // TODO: based on edge type, create groupBy or sortBy transformations.
-    return new ShuffleTran();
+  private SparkShuffler generate(SparkEdgeProperty edge) {
+    // TODO: create different shuffler based on edge prop.
+    return new GroupByShuffler();
   }
 
   private ReduceTran generate(ReduceWork rw) throws IOException {



Mime
View raw message