spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mridul...@apache.org
Subject spark git commit: [SPARK-22465][FOLLOWUP] Update the number of partitions of default partitioner when defaultParallelism is set
Date Tue, 23 Jan 2018 12:08:40 GMT
Repository: spark
Updated Branches:
  refs/heads/master b2ce17b4c -> 96cb60bc3


[SPARK-22465][FOLLOWUP] Update the number of partitions of default partitioner when defaultParallelism
is set

## What changes were proposed in this pull request?

#20002 purposed a way to safe check the default partitioner, however, if `spark.default.parallelism`
is set, the defaultParallelism still could be smaller than the proper number of partitions
for upstreams RDDs. This PR tries to extend the approach to address the condition when `spark.default.parallelism`
is set.

The requirements where the PR helps with are :
- Max partitioner is not eligible since it is atleast an order smaller, and
- User has explicitly set 'spark.default.parallelism', and
- Value of 'spark.default.parallelism' is lower than max partitioner
- Since max partitioner was discarded due to being at least an order smaller, default parallelism
is worse - even though user specified.

Under the rest cases, the changes should be no-op.

## How was this patch tested?

Add corresponding test cases in `PairRDDFunctionsSuite` and `PartitioningSuite`.

Author: Xingbo Jiang <xingbo.jiang@databricks.com>

Closes #20091 from jiangxb1987/partitioner.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/96cb60bc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/96cb60bc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/96cb60bc

Branch: refs/heads/master
Commit: 96cb60bc33936c1aaf728a1738781073891480ff
Parents: b2ce17b
Author: Xingbo Jiang <xingbo.jiang@databricks.com>
Authored: Tue Jan 23 04:08:32 2018 -0800
Committer: Mridul Muralidharan <mridul@gmail.com>
Committed: Tue Jan 23 04:08:32 2018 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/Partitioner.scala    | 51 +++++++++++---------
 .../org/apache/spark/PartitioningSuite.scala    | 44 ++++++++++++++---
 .../spark/rdd/PairRDDFunctionsSuite.scala       | 45 ++++++++++++++++-
 3 files changed, 108 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/96cb60bc/core/src/main/scala/org/apache/spark/Partitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala
index 437bbaa..c940cb2 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -43,17 +43,19 @@ object Partitioner {
   /**
    * Choose a partitioner to use for a cogroup-like operation between a number of RDDs.
    *
-   * If any of the RDDs already has a partitioner, and the number of partitions of the
-   * partitioner is either greater than or is less than and within a single order of
-   * magnitude of the max number of upstream partitions, choose that one.
+   * If spark.default.parallelism is set, we'll use the value of SparkContext defaultParallelism
+   * as the default partitions number, otherwise we'll use the max number of upstream partitions.
    *
-   * Otherwise, we use a default HashPartitioner. For the number of partitions, if
-   * spark.default.parallelism is set, then we'll use the value from SparkContext
-   * defaultParallelism, otherwise we'll use the max number of upstream partitions.
+   * When available, we choose the partitioner from rdds with maximum number of partitions.
If this
+   * partitioner is eligible (number of partitions within an order of maximum number of partitions
+   * in rdds), or has partition number higher than default partitions number - we use this
+   * partitioner.
    *
-   * Unless spark.default.parallelism is set, the number of partitions will be the
-   * same as the number of partitions in the largest upstream RDD, as this should
-   * be least likely to cause out-of-memory errors.
+   * Otherwise, we'll use a new HashPartitioner with the default partitions number.
+   *
+   * Unless spark.default.parallelism is set, the number of partitions will be the same as
the
+   * number of partitions in the largest upstream RDD, as this should be least likely to
cause
+   * out-of-memory errors.
    *
    * We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD.
    */
@@ -67,31 +69,32 @@ object Partitioner {
       None
     }
 
-    if (isEligiblePartitioner(hasMaxPartitioner, rdds)) {
+    val defaultNumPartitions = if (rdd.context.conf.contains("spark.default.parallelism"))
{
+      rdd.context.defaultParallelism
+    } else {
+      rdds.map(_.partitions.length).max
+    }
+
+    // If the existing max partitioner is an eligible one, or its partitions number is larger
+    // than the default number of partitions, use the existing partitioner.
+    if (hasMaxPartitioner.nonEmpty && (isEligiblePartitioner(hasMaxPartitioner.get,
rdds) ||
+        defaultNumPartitions < hasMaxPartitioner.get.getNumPartitions)) {
       hasMaxPartitioner.get.partitioner.get
     } else {
-      if (rdd.context.conf.contains("spark.default.parallelism")) {
-        new HashPartitioner(rdd.context.defaultParallelism)
-      } else {
-        new HashPartitioner(rdds.map(_.partitions.length).max)
-      }
+      new HashPartitioner(defaultNumPartitions)
     }
   }
 
   /**
-   * Returns true if the number of partitions of the RDD is either greater
-   * than or is less than and within a single order of magnitude of the
-   * max number of upstream partitions;
-   * otherwise, returns false
+   * Returns true if the number of partitions of the RDD is either greater than or is less
than and
+   * within a single order of magnitude of the max number of upstream partitions, otherwise
returns
+   * false.
    */
   private def isEligiblePartitioner(
-     hasMaxPartitioner: Option[RDD[_]],
+     hasMaxPartitioner: RDD[_],
      rdds: Seq[RDD[_]]): Boolean = {
-    if (hasMaxPartitioner.isEmpty) {
-      return false
-    }
     val maxPartitions = rdds.map(_.partitions.length).max
-    log10(maxPartitions) - log10(hasMaxPartitioner.get.getNumPartitions) < 1
+    log10(maxPartitions) - log10(hasMaxPartitioner.getNumPartitions) < 1
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/96cb60bc/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
index 155ca17..9206b5d 100644
--- a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
+++ b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
@@ -262,14 +262,11 @@ class PartitioningSuite extends SparkFunSuite with SharedSparkContext
with Priva
 
   test("defaultPartitioner") {
     val rdd1 = sc.parallelize((1 to 1000).map(x => (x, x)), 150)
-    val rdd2 = sc
-      .parallelize(Array((1, 2), (2, 3), (2, 4), (3, 4)))
+    val rdd2 = sc.parallelize(Array((1, 2), (2, 3), (2, 4), (3, 4)))
       .partitionBy(new HashPartitioner(10))
-    val rdd3 = sc
-      .parallelize(Array((1, 6), (7, 8), (3, 10), (5, 12), (13, 14)))
+    val rdd3 = sc.parallelize(Array((1, 6), (7, 8), (3, 10), (5, 12), (13, 14)))
       .partitionBy(new HashPartitioner(100))
-    val rdd4 = sc
-      .parallelize(Array((1, 2), (2, 3), (2, 4), (3, 4)))
+    val rdd4 = sc.parallelize(Array((1, 2), (2, 3), (2, 4), (3, 4)))
       .partitionBy(new HashPartitioner(9))
     val rdd5 = sc.parallelize((1 to 10).map(x => (x, x)), 11)
 
@@ -284,7 +281,42 @@ class PartitioningSuite extends SparkFunSuite with SharedSparkContext
with Priva
     assert(partitioner3.numPartitions == rdd3.getNumPartitions)
     assert(partitioner4.numPartitions == rdd3.getNumPartitions)
     assert(partitioner5.numPartitions == rdd4.getNumPartitions)
+  }
 
+  test("defaultPartitioner when defaultParallelism is set") {
+    assert(!sc.conf.contains("spark.default.parallelism"))
+    try {
+      sc.conf.set("spark.default.parallelism", "4")
+
+      val rdd1 = sc.parallelize((1 to 1000).map(x => (x, x)), 150)
+      val rdd2 = sc.parallelize(Array((1, 2), (2, 3), (2, 4), (3, 4)))
+        .partitionBy(new HashPartitioner(10))
+      val rdd3 = sc.parallelize(Array((1, 6), (7, 8), (3, 10), (5, 12), (13, 14)))
+        .partitionBy(new HashPartitioner(100))
+      val rdd4 = sc.parallelize(Array((1, 2), (2, 3), (2, 4), (3, 4)))
+        .partitionBy(new HashPartitioner(9))
+      val rdd5 = sc.parallelize((1 to 10).map(x => (x, x)), 11)
+      val rdd6 = sc.parallelize(Array((1, 2), (2, 3), (2, 4), (3, 4)))
+        .partitionBy(new HashPartitioner(3))
+
+      val partitioner1 = Partitioner.defaultPartitioner(rdd1, rdd2)
+      val partitioner2 = Partitioner.defaultPartitioner(rdd2, rdd3)
+      val partitioner3 = Partitioner.defaultPartitioner(rdd3, rdd1)
+      val partitioner4 = Partitioner.defaultPartitioner(rdd1, rdd2, rdd3)
+      val partitioner5 = Partitioner.defaultPartitioner(rdd4, rdd5)
+      val partitioner6 = Partitioner.defaultPartitioner(rdd5, rdd5)
+      val partitioner7 = Partitioner.defaultPartitioner(rdd1, rdd6)
+
+      assert(partitioner1.numPartitions == rdd2.getNumPartitions)
+      assert(partitioner2.numPartitions == rdd3.getNumPartitions)
+      assert(partitioner3.numPartitions == rdd3.getNumPartitions)
+      assert(partitioner4.numPartitions == rdd3.getNumPartitions)
+      assert(partitioner5.numPartitions == rdd4.getNumPartitions)
+      assert(partitioner6.numPartitions == sc.defaultParallelism)
+      assert(partitioner7.numPartitions == sc.defaultParallelism)
+    } finally {
+      sc.conf.remove("spark.default.parallelism")
+    }
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/96cb60bc/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
index a39e046..47af5c3 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
@@ -322,8 +322,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext
{
   }
 
   // See SPARK-22465
-  test("cogroup between multiple RDD" +
-    " with number of partitions similar in order of magnitude") {
+  test("cogroup between multiple RDD with number of partitions similar in order of magnitude")
{
     val rdd1 = sc.parallelize((1 to 1000).map(x => (x, x)), 20)
     val rdd2 = sc
       .parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
@@ -332,6 +331,48 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext
{
     assert(joined.getNumPartitions == rdd2.getNumPartitions)
   }
 
+  test("cogroup between multiple RDD when defaultParallelism is set without proper partitioner")
{
+    assert(!sc.conf.contains("spark.default.parallelism"))
+    try {
+      sc.conf.set("spark.default.parallelism", "4")
+      val rdd1 = sc.parallelize((1 to 1000).map(x => (x, x)), 20)
+      val rdd2 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)), 10)
+      val joined = rdd1.cogroup(rdd2)
+      assert(joined.getNumPartitions == sc.defaultParallelism)
+    } finally {
+      sc.conf.remove("spark.default.parallelism")
+    }
+  }
+
+  test("cogroup between multiple RDD when defaultParallelism is set with proper partitioner")
{
+    assert(!sc.conf.contains("spark.default.parallelism"))
+    try {
+      sc.conf.set("spark.default.parallelism", "4")
+      val rdd1 = sc.parallelize((1 to 1000).map(x => (x, x)), 20)
+      val rdd2 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
+        .partitionBy(new HashPartitioner(10))
+      val joined = rdd1.cogroup(rdd2)
+      assert(joined.getNumPartitions == rdd2.getNumPartitions)
+    } finally {
+      sc.conf.remove("spark.default.parallelism")
+    }
+  }
+
+  test("cogroup between multiple RDD when defaultParallelism is set; with huge number of
" +
+    "partitions in upstream RDDs") {
+    assert(!sc.conf.contains("spark.default.parallelism"))
+    try {
+      sc.conf.set("spark.default.parallelism", "4")
+      val rdd1 = sc.parallelize((1 to 1000).map(x => (x, x)), 1000)
+      val rdd2 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
+        .partitionBy(new HashPartitioner(10))
+      val joined = rdd1.cogroup(rdd2)
+      assert(joined.getNumPartitions == rdd2.getNumPartitions)
+    } finally {
+      sc.conf.remove("spark.default.parallelism")
+    }
+  }
+
   test("rightOuterJoin") {
     val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
     val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message