spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [spark] LantaoJin commented on a change in pull request #29378: [SPARK-30069][CORE][YARN] Clean up non-shuffle disk block manager files following executor exists on YARN
Date Mon, 10 Aug 2020 03:41:25 GMT

LantaoJin commented on a change in pull request #29378:
URL: https://github.com/apache/spark/pull/29378#discussion_r467674124



##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -46,18 +46,35 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop:
Boolea
     System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR)
   }
 
+  private def containerDirEnabled: Boolean = Utils.isRunningInYarnContainer(conf)
+
+  /* Create container directories on YARN to persist the temporary files.
+   * (temp_local, temp_shuffle)
+   * These files have no opportunity to be cleaned before application end on YARN.
+   * This is a real issue, especially for long-lived Spark application like Spark thrift-server.
+   * So we persist these files in YARN container directories which could be cleaned by YARN
when
+   * the container exists. */
+  private[spark] val containerDirs: Array[File] =
+    if (containerDirEnabled) createContainerDirs(conf) else Array.empty[File]
+
   private[spark] val localDirsString: Array[String] = localDirs.map(_.toString)
 
   // The content of subDirs is immutable but the content of subDirs(i) is mutable. And the
content
   // of subDirs(i) is protected by the lock of subDirs(i)
   private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))
 
+  private val subContainerDirs = if (containerDirEnabled) {
+    Array.fill(containerDirs.length)(new Array[File](subDirsPerLocalDir))
+  } else {
+    Array.empty[Array[File]]
+  }
+
   private val shutdownHook = addShutdownHook()
 
-  /** Looks up a file by hashing it into one of our local subdirectories. */
   // This method should be kept in sync with
   // org.apache.spark.network.shuffle.ExecutorDiskUtils#getFile().

Review comment:
       > Why does this function need to have unused parameters?
   
   Ah, you must mean the `getFile` in `DiskBlockManager` has 4 parameters and `getFile` in
`ExecutorDiskUtils` has 3 parameters. ExecutorDiskUtils is a utility, so 3 parameters can
define a file path structure. The parameter `subDirs` in `DiskBlockManager` only is a local
variable which used in other parts. It does not impact the finding algorithm result. I can
use 3 parameters in `DiskBlockManager.getFile` as well. But I need to use different array
by a condition, Like:
   ```scala
   private def getFile(localDirs: Array[File],  subDirsPerLocalDir: Int, filename: String):
File = {
   ...
       val properSubDirs = if (someConditions) {
         subDirs
       } else {
         subContainerDirs
       }
       val old = properSubDirs(dirId)(subDirId)
   ...
   ```
   But the `someConditions` is a little hard to determine in this method.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message