spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [spark] tgravescs commented on a change in pull request #30164: [SPARK-32919][SHUFFLE] Driver side changes for coordinating push based shuffle by selecting external shuffle services for merging partitions
Date Tue, 10 Nov 2020 20:08:44 GMT

tgravescs commented on a change in pull request #30164:
URL: https://github.com/apache/spark/pull/30164#discussion_r520812097



##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -1938,4 +1938,42 @@ package object config {
       .version("3.0.1")
       .booleanConf
       .createWithDefault(false)
+
+  private[spark] val PUSH_BASED_SHUFFLE_ENABLED =
+    ConfigBuilder("spark.shuffle.push.enabled")
+      .doc("Set to 'true' to enable push based shuffle on the client side and this works
in" +
+        "conjunction with the server side flag spark.shuffle.server.mergedShuffleFileManagerImpl"
+
+        "which needs to be set with the appropriate" +
+        "org.apache.spark.network.shuffle.MergedShuffleFileManager implementation for push-based"
+
+        "shuffle to be enabled")
+      .version("3.1.0")
+      .booleanConf
+      .createWithDefault(false)
+
+  private[spark] val SHUFFLE_MERGERS_MAX_RETAINED_LOCATIONS =
+    ConfigBuilder("spark.shuffle.push.retainedMergerLocations")
+      .doc("Maximum number of shuffle push mergers locations cached for push based shuffle."
+
+        "Currently Shuffle push merger locations are nothing but shuffle services where an"
+

Review comment:
       nit Shuffle -> shuffle

##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -1938,4 +1938,42 @@ package object config {
       .version("3.0.1")
       .booleanConf
       .createWithDefault(false)
+
+  private[spark] val PUSH_BASED_SHUFFLE_ENABLED =
+    ConfigBuilder("spark.shuffle.push.enabled")
+      .doc("Set to 'true' to enable push based shuffle on the client side and this works
in" +
+        "conjunction with the server side flag spark.shuffle.server.mergedShuffleFileManagerImpl"
+
+        "which needs to be set with the appropriate" +
+        "org.apache.spark.network.shuffle.MergedShuffleFileManager implementation for push-based"
+
+        "shuffle to be enabled")
+      .version("3.1.0")
+      .booleanConf
+      .createWithDefault(false)
+
+  private[spark] val SHUFFLE_MERGERS_MAX_RETAINED_LOCATIONS =
+    ConfigBuilder("spark.shuffle.push.retainedMergerLocations")
+      .doc("Maximum number of shuffle push mergers locations cached for push based shuffle."
+
+        "Currently Shuffle push merger locations are nothing but shuffle services where an"
+
+        "executor is launched in the case of Push based shuffle.")
+      .version("3.1.0")
+      .intConf
+      .createWithDefault(500)
+
+  private[spark] val SHUFFLE_MERGER_LOCATIONS_MIN_THRESHOLD_RATIO =
+    ConfigBuilder("spark.shuffle.push.mergersMinThresholdRatio")
+      .doc("Minimum percentage of shuffle push mergers locations required to enable push
based" +
+        "shuffle for the stage with respect to number of partitions of the child stage. This
is" +
+        " the number of unique Node Manager locations needed to enable push based shuffle.")

Review comment:
       Perhaps we can reword a bit:  The minimum number of shuffle merger locations required
to enable pushed based shuffle for a stage. This is specified as a ratio of the number of
partitions in the child stage. For example, a reduce stage which has 100 partitions and uses
the default value 0.05 requires at least 5 unique merger locations to enable push based shuffle.
Merger locations are currently defined as external shuffle services.
   
   you don't have to use exact text here, just thought something like this might be a bit
more clear 

##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -1252,6 +1254,32 @@ private[spark] class DAGScheduler(
     execCores.map(cores => properties.setProperty(EXECUTOR_CORES_LOCAL_PROPERTY, cores))
   }
 
+  /**
+   * If push based shuffle is enabled, set the shuffle services to be used for the given
+   * shuffle map stage for block push/merge.
+   *
+   * Even with dynamic resource allocation kicking in and significantly reducing the number
+   * of available active executors, we would still be able to get sufficient shuffle service
+   * locations for block push/merge by getting the historical locations of past executors.
+   */
+  private def prepareShuffleServicesForShuffleMapStage(stage: ShuffleMapStage): Unit = {
+    // TODO: Handle stage reuse/retry cases separately as without finalize changes we cannot

Review comment:
       can we add jira number here assuming coming later

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -526,6 +548,8 @@ class BlockManagerMasterEndpoint(
 
       blockManagerInfo(id) = new BlockManagerInfo(id, System.currentTimeMillis(),
         maxOnHeapMemSize, maxOffHeapMemSize, storageEndpoint, externalShuffleServiceBlockStatus)
+
+      addMergerLocation(id)

Review comment:
       seems like we can skip this if push based is disabled or external shuffle disabled.

##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -1938,4 +1938,42 @@ package object config {
       .version("3.0.1")
       .booleanConf
       .createWithDefault(false)
+
+  private[spark] val PUSH_BASED_SHUFFLE_ENABLED =
+    ConfigBuilder("spark.shuffle.push.enabled")
+      .doc("Set to 'true' to enable push based shuffle on the client side and this works
in" +

Review comment:
       nit need to add spaces at the end of each line before ", this goes for configs below
as well

##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -1938,4 +1938,42 @@ package object config {
       .version("3.0.1")
       .booleanConf
       .createWithDefault(false)
+
+  private[spark] val PUSH_BASED_SHUFFLE_ENABLED =
+    ConfigBuilder("spark.shuffle.push.enabled")
+      .doc("Set to 'true' to enable push based shuffle on the client side and this works
in" +
+        "conjunction with the server side flag spark.shuffle.server.mergedShuffleFileManagerImpl"
+
+        "which needs to be set with the appropriate" +
+        "org.apache.spark.network.shuffle.MergedShuffleFileManager implementation for push-based"
+
+        "shuffle to be enabled")
+      .version("3.1.0")
+      .booleanConf
+      .createWithDefault(false)
+
+  private[spark] val SHUFFLE_MERGERS_MAX_RETAINED_LOCATIONS =
+    ConfigBuilder("spark.shuffle.push.retainedMergerLocations")
+      .doc("Maximum number of shuffle push mergers locations cached for push based shuffle."
+
+        "Currently Shuffle push merger locations are nothing but shuffle services where an"
+
+        "executor is launched in the case of Push based shuffle.")
+      .version("3.1.0")
+      .intConf
+      .createWithDefault(500)
+
+  private[spark] val SHUFFLE_MERGER_LOCATIONS_MIN_THRESHOLD_RATIO =
+    ConfigBuilder("spark.shuffle.push.mergersMinThresholdRatio")
+      .doc("Minimum percentage of shuffle push mergers locations required to enable push
based" +
+        "shuffle for the stage with respect to number of partitions of the child stage. This
is" +
+        " the number of unique Node Manager locations needed to enable push based shuffle.")
+      .version("3.1.0")
+      .doubleConf
+      .createWithDefault(0.05)
+
+  private[spark] val SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD =
+    ConfigBuilder("spark.shuffle.push.mergersMinStaticThreshold")
+      .doc("Minimum static number of of shuffle push mergers locations should be available
in" +
+        " order to enable push based shuffle for a stage. Note this config works in" +
+        " conjunction with spark.shuffle.push.mergersMinThresholdRatio")

Review comment:
       we should specify how this works in conjunction -> max of those 2




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message