spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hvanhov...@apache.org
Subject spark git commit: [SPARK-23989][SQL] exchange should copy data before non-serialized shuffle
Date Thu, 19 Apr 2018 15:55:09 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 7fb11176f -> fb968215c


[SPARK-23989][SQL] exchange should copy data before non-serialized shuffle

## What changes were proposed in this pull request?

In Spark SQL, we usually reuse the `UnsafeRow` instance and need to copy the data when a place
buffers non-serialized objects.

Shuffle may buffer objects if we don't make it to the bypass merge shuffle or unsafe shuffle.

`ShuffleExchangeExec.needToCopyObjectsBeforeShuffle` misses the case that, if `spark.sql.shuffle.partitions`
is large enough, we could fail to run unsafe shuffle and go with the non-serialized shuffle.

This bug is very hard to hit since users wouldn't set such a large number of partitions(16
million) for Spark SQL exchange.

TODO: test

## How was this patch tested?

todo.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #21101 from cloud-fan/shuffle.

(cherry picked from commit 6e19f7683fc73fabe7cdaac4eb1982d2e3e607b7)
Signed-off-by: Herman van Hovell <hvanhovell@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fb968215
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fb968215
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fb968215

Branch: refs/heads/branch-2.3
Commit: fb968215ca014c5cf40097a3c4588bbee11e2c02
Parents: 7fb1117
Author: Wenchen Fan <wenchen@databricks.com>
Authored: Thu Apr 19 17:54:53 2018 +0200
Committer: Herman van Hovell <hvanhovell@databricks.com>
Committed: Thu Apr 19 17:55:06 2018 +0200

----------------------------------------------------------------------
 .../exchange/ShuffleExchangeExec.scala          | 21 ++++++++++----------
 1 file changed, 10 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fb968215/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
index 4d95ee3..b892037 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
@@ -153,12 +153,9 @@ object ShuffleExchangeExec {
    * See SPARK-2967, SPARK-4479, and SPARK-7375 for more discussion of this issue.
    *
    * @param partitioner the partitioner for the shuffle
-   * @param serializer the serializer that will be used to write rows
    * @return true if rows should be copied before being shuffled, false otherwise
    */
-  private def needToCopyObjectsBeforeShuffle(
-      partitioner: Partitioner,
-      serializer: Serializer): Boolean = {
+  private def needToCopyObjectsBeforeShuffle(partitioner: Partitioner): Boolean = {
     // Note: even though we only use the partitioner's `numPartitions` field, we require
it to be
     // passed instead of directly passing the number of partitions in order to guard against
     // corner-cases where a partitioner constructed with `numPartitions` partitions may output
@@ -167,22 +164,24 @@ object ShuffleExchangeExec {
     val shuffleManager = SparkEnv.get.shuffleManager
     val sortBasedShuffleOn = shuffleManager.isInstanceOf[SortShuffleManager]
     val bypassMergeThreshold = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
+    val numParts = partitioner.numPartitions
     if (sortBasedShuffleOn) {
-      val bypassIsSupported = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager]
-      if (bypassIsSupported && partitioner.numPartitions <= bypassMergeThreshold)
{
+      if (numParts <= bypassMergeThreshold) {
         // If we're using the original SortShuffleManager and the number of output partitions
is
         // sufficiently small, then Spark will fall back to the hash-based shuffle write
path, which
         // doesn't buffer deserialized records.
         // Note that we'll have to remove this case if we fix SPARK-6026 and remove this
bypass.
         false
-      } else if (serializer.supportsRelocationOfSerializedObjects) {
+      } else if (numParts <= SortShuffleManager.MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE)
{
         // SPARK-4550 and  SPARK-7081 extended sort-based shuffle to serialize individual
records
         // prior to sorting them. This optimization is only applied in cases where shuffle
         // dependency does not specify an aggregator or ordering and the record serializer
has
-        // certain properties. If this optimization is enabled, we can safely avoid the copy.
+        // certain properties and the number of partitions doesn't exceed the limitation.
If this
+        // optimization is enabled, we can safely avoid the copy.
         //
-        // Exchange never configures its ShuffledRDDs with aggregators or key orderings,
so we only
-        // need to check whether the optimization is enabled and supported by our serializer.
+        // Exchange never configures its ShuffledRDDs with aggregators or key orderings,
and the
+        // serializer in Spark SQL always satisfy the properties, so we only need to check
whether
+        // the number of partitions exceeds the limitation.
         false
       } else {
         // Spark's SortShuffleManager uses `ExternalSorter` to buffer records in memory,
so we must
@@ -298,7 +297,7 @@ object ShuffleExchangeExec {
         rdd
       }
 
-      if (needToCopyObjectsBeforeShuffle(part, serializer)) {
+      if (needToCopyObjectsBeforeShuffle(part)) {
         newRdd.mapPartitionsInternal { iter =>
           val getPartitionKey = getPartitionKeyExtractor()
           iter.map { row => (part.getPartition(getPartitionKey(row)), row.copy()) }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message