spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zsxw...@apache.org
Subject [spark] branch master updated: [SPARK-27210][SS] Cleanup incomplete output files in ManifestFileCommitProtocol if task is aborted
Date Fri, 22 Mar 2019 18:27:10 GMT
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 78d546f  [SPARK-27210][SS] Cleanup incomplete output files in ManifestFileCommitProtocol
if task is aborted
78d546f is described below

commit 78d546fe15aebcbf4b671c44383ddcf82b05b8a7
Author: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
AuthorDate: Fri Mar 22 11:26:53 2019 -0700

    [SPARK-27210][SS] Cleanup incomplete output files in ManifestFileCommitProtocol if task
is aborted
    
    ## What changes were proposed in this pull request?
    
    This patch proposes ManifestFileCommitProtocol to clean up incomplete output files in
task level if task aborts. Please note that this works as 'best-effort', not kind of guarantee,
as we have in HadoopMapReduceCommitProtocol.
    
    ## How was this patch tested?
    
    Added UT.
    
    Closes #24154 from HeartSaVioR/SPARK-27210.
    
    Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
    Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
---
 .../streaming/ManifestFileCommitProtocol.scala     |  7 ++++--
 .../spark/sql/streaming/FileStreamSinkSuite.scala  | 29 ++++++++++++++++++++++
 2 files changed, 34 insertions(+), 2 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
index 92191c8..916bd2d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
@@ -114,7 +114,10 @@ class ManifestFileCommitProtocol(jobId: String, path: String)
   }
 
   override def abortTask(taskContext: TaskAttemptContext): Unit = {
-    // Do nothing
-    // TODO: we can also try delete the addedFiles as a best-effort cleanup.
+    // best effort cleanup of incomplete files
+    if (addedFiles.nonEmpty) {
+      val fs = new Path(addedFiles.head).getFileSystem(taskContext.getConfiguration)
+      addedFiles.foreach { file => fs.delete(new Path(file), false) }
+    }
   }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 619d118..020ab23 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -18,8 +18,11 @@
 package org.apache.spark.sql.streaming
 
 import java.io.File
+import java.nio.file.Files
 import java.util.Locale
 
+import scala.collection.JavaConverters._
+
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
@@ -478,4 +481,30 @@ class FileStreamSinkSuite extends StreamTest {
       checkDatasetUnorderly(outputDf, 1, 2, 3)
     }
   }
+
+  testQuietly("cleanup incomplete output for aborted task") {
+    withTempDir { tempDir =>
+      val checkpointDir = new File(tempDir, "chk")
+      val outputDir = new File(tempDir, "output")
+      val inputData = MemoryStream[Int]
+      inputData.addData(1, 2, 3)
+      val q = inputData.toDS().map(_ / 0)
+        .writeStream
+        .option("checkpointLocation", checkpointDir.getCanonicalPath)
+        .format("parquet")
+        .start(outputDir.getCanonicalPath)
+
+      intercept[StreamingQueryException] {
+        try {
+          q.processAllAvailable()
+        } finally {
+          q.stop()
+        }
+      }
+
+      val outputFiles = Files.walk(outputDir.toPath).iterator().asScala
+        .filter(_.toString.endsWith(".parquet"))
+      assert(outputFiles.toList.isEmpty, "Incomplete files should be cleaned up.")
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message