pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xu...@apache.org
Subject svn commit: r1773917 - in /pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark: SparkLauncher.java SparkUtil.java converter/RankConverter.java converter/SkewedJoinConverter.java converter/SortConverter.java
Date Tue, 13 Dec 2016 03:41:31 GMT
Author: xuefu
Date: Tue Dec 13 03:41:31 2016
New Revision: 1773917

URL: http://svn.apache.org/viewvc?rev=1773917&view=rev
Log:
PIG-4952: Calculate the value of parallism for spark mode (Liyun via Xuefu)

Modified:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RankConverter.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1773917&r1=1773916&r2=1773917&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
(original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
Tue Dec 13 03:41:31 2016
@@ -664,9 +664,9 @@ public class SparkLauncher extends Launc
         SchemaTupleBackend.initialize(jobConf, pigContext);
         Utils.setDefaultTimeZone(jobConf);
         PigMapReduce.sJobConfInternal.set(jobConf);
-        String sparkReducers = pigContext.getProperties().getProperty("spark.reducers");
-        if (sparkReducers != null) {
-            SparkUtil.setSparkReducers(Integer.parseInt(sparkReducers));
+        String parallelism = pigContext.getProperties().getProperty("spark.default.parallelism");
+        if (parallelism != null) {
+            SparkUtil.setSparkDefaultParallelism(Integer.parseInt(parallelism));
         }
     }
 }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java?rev=1773917&r1=1773916&r2=1773917&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
(original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
Tue Dec 13 03:41:31 2016
@@ -59,7 +59,7 @@ import org.apache.spark.rdd.RDD;
 
 public class SparkUtil {
 
-    private static ThreadLocal<Integer> SPARK_REDUCERS = null;
+    private static ThreadLocal<Integer> SPARK_DEFAULT_PARALLELISM = null;
     private static ConcurrentHashMap<String, Broadcast<List<Tuple>>> broadcastedVars
= new ConcurrentHashMap() ;
 
     public static <T> ClassTag<T> getManifest(Class<T> clazz) {
@@ -131,17 +131,25 @@ public class SparkUtil {
 
     public static int getParallelism(List<RDD<Tuple>> predecessors,
             PhysicalOperator physicalOperator) {
-        if (SPARK_REDUCERS != null) {
-            return getSparkReducers();
+        if (SPARK_DEFAULT_PARALLELISM != null) {
+            return getSparkDefaultParallelism();
         }
 
         int parallelism = physicalOperator.getRequestedParallelism();
         if (parallelism <= 0) {
-            // Parallelism wasn't set in Pig, so set it to whatever Spark thinks
-            // is reasonable.
-            parallelism = predecessors.get(0).context().defaultParallelism();
+            //Spark automatically sets the number of "map" tasks to run on each file according
to its size (though
+            // you can control it through optional parameters to SparkContext.textFile, etc),
and for distributed
+            //"reduce" operations, such as groupByKey and reduceByKey, it uses the largest
parent RDD's number of
+            // partitions.
+            int maxParallism = 0;
+            for (int i = 0; i < predecessors.size(); i++) {
+                int tmpParallelism = predecessors.get(i).getNumPartitions();
+                if (tmpParallelism > maxParallism) {
+                    maxParallism = tmpParallelism;
+                }
+            }
+            parallelism = maxParallism;
         }
-
         return parallelism;
     }
 
@@ -179,11 +187,11 @@ public class SparkUtil {
     }
 
 
-    public static int getSparkReducers() {
-        return SPARK_REDUCERS.get();
+    public static int getSparkDefaultParallelism() {
+        return SPARK_DEFAULT_PARALLELISM.get();
     }
 
-    public static void setSparkReducers(int sparkReducers) {
-        SPARK_REDUCERS.set(sparkReducers);
+    public static void setSparkDefaultParallelism(int sparkDefaultParallelism) {
+        SPARK_DEFAULT_PARALLELISM.set(sparkDefaultParallelism);
     }
 }
\ No newline at end of file

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RankConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RankConverter.java?rev=1773917&r1=1773916&r2=1773917&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RankConverter.java
(original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RankConverter.java
Tue Dec 13 03:41:31 2016
@@ -45,21 +45,22 @@ public class RankConverter implements RD
 	@Override
 	public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, PORank poRank)
 			throws IOException {
-		SparkUtil.assertPredecessorSize(predecessors, poRank, 1);
+        int parallelism = SparkUtil.getParallelism(predecessors, poRank);
+        SparkUtil.assertPredecessorSize(predecessors, poRank, 1);
         RDD<Tuple> rdd = predecessors.get(0);
-		JavaPairRDD<Integer, Long> javaPairRdd = rdd.toJavaRDD()
-				.mapToPair(new ToPairRdd());
-		JavaPairRDD<Integer, Iterable<Long>> groupedByIndex = javaPairRdd
-				.groupByKey();
-		JavaPairRDD<Integer, Long> countsByIndex = groupedByIndex
-				.mapToPair(new IndexCounters());
-		JavaPairRDD<Integer, Long> sortedCountsByIndex = countsByIndex
-				.sortByKey(true);
-		Map<Integer, Long> counts = sortedCountsByIndex.collectAsMap();
-		JavaRDD<Tuple> finalRdd = rdd.toJavaRDD()
-				.map(new RankFunction(new HashMap<Integer, Long>(counts)));
-		return finalRdd.rdd();
-	}
+        JavaPairRDD<Integer, Long> javaPairRdd = rdd.toJavaRDD()
+                .mapToPair(new ToPairRdd());
+        JavaPairRDD<Integer, Iterable<Long>> groupedByIndex = javaPairRdd
+                .groupByKey(parallelism);
+        JavaPairRDD<Integer, Long> countsByIndex = groupedByIndex
+                .mapToPair(new IndexCounters());
+        JavaPairRDD<Integer, Long> sortedCountsByIndex = countsByIndex
+                .sortByKey(true, parallelism);
+        Map<Integer, Long> counts = sortedCountsByIndex.collectAsMap();
+        JavaRDD<Tuple> finalRdd = rdd.toJavaRDD()
+                .map(new RankFunction(new HashMap<Integer, Long>(counts)));
+        return finalRdd.rdd();
+    }
 
 	@SuppressWarnings("serial")
 	private static class ToPairRdd implements 

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java?rev=1773917&r1=1773916&r2=1773917&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java
(original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java
Tue Dec 13 03:41:31 2016
@@ -78,9 +78,10 @@ public class SkewedJoinConverter impleme
                 rdd2Pair, SparkUtil.getManifest(IndexedKey.class),
                 SparkUtil.getManifest(Tuple.class));
 
+        int parallelism = SparkUtil.getParallelism(predecessors, poSkewedJoin);
         // join() returns (key, (t1, t2)) where (key, t1) is in this and (key, t2) is in
other
         JavaPairRDD<IndexedKey, Tuple2<Tuple, Tuple>> result_KeyValue = rdd1Pair_javaRDD
-                .join(rdd2Pair_javaRDD);
+                .join(rdd2Pair_javaRDD, parallelism);
 
         // map to get JavaRDD<Tuple> from JAvaPairRDD<IndexedKey, Tuple2<Tuple,
Tuple>> by
         // ignoring the key (of type IndexedKey) and appending the values (the

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java?rev=1773917&r1=1773916&r2=1773917&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java
(original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java
Tue Dec 13 03:41:31 2016
@@ -46,6 +46,7 @@ public class SortConverter implements RD
             throws IOException {
         SparkUtil.assertPredecessorSize(predecessors, sortOperator, 1);
         RDD<Tuple> rdd = predecessors.get(0);
+        int parallelism = SparkUtil.getParallelism(predecessors, sortOperator);
         RDD<Tuple2<Tuple, Object>> rddPair = rdd.map(new ToKeyValueFunction(),
                 SparkUtil.<Tuple, Object> getTuple2Manifest());
 
@@ -53,8 +54,9 @@ public class SortConverter implements RD
                 SparkUtil.getManifest(Tuple.class),
                 SparkUtil.getManifest(Object.class));
 
+
         JavaPairRDD<Tuple, Object> sorted = r.sortByKey(
-                sortOperator.getMComparator(), true);
+                sortOperator.getMComparator(), true, parallelism);
         JavaRDD<Tuple> mapped = sorted.mapPartitions(TO_VALUE_FUNCTION);
 
         return mapped.rdd();



Mime
View raw message