spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject spark git commit: [SPARK-17475][STREAMING] Delete CRC files if the filesystem doesn't use checksum files
Date Wed, 02 Nov 2016 06:00:31 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 1bbf9ff63 -> 39d2fdb51


[SPARK-17475][STREAMING] Delete CRC files if the filesystem doesn't use checksum files

## What changes were proposed in this pull request?

When the metadata logs for various parts of Structured Streaming are stored on non-HDFS filesystems
such as NFS or ext4, the HDFSMetadataLog class leaves hidden HDFS-style checksum (CRC) files
in the log directory, one file per batch. This PR modifies HDFSMetadataLog so that it detects
the use of a filesystem that doesn't use CRC files and removes the CRC files.
## How was this patch tested?

Modified an existing test case in HDFSMetadataLogSuite to check whether HDFSMetadataLog correctly
removes CRC files on the local POSIX filesystem.  Ran the entire regression suite.

Author: frreiss <frreiss@us.ibm.com>

Closes #15027 from frreiss/fred-17475.

(cherry picked from commit 620da3b4828b3580c7ed7339b2a07938e6be1bb1)
Signed-off-by: Reynold Xin <rxin@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/39d2fdb5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/39d2fdb5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/39d2fdb5

Branch: refs/heads/branch-2.1
Commit: 39d2fdb51233ed9b1aaf3adaa3267853f5e58c0f
Parents: 1bbf9ff
Author: frreiss <frreiss@us.ibm.com>
Authored: Tue Nov 1 23:00:17 2016 -0700
Committer: Reynold Xin <rxin@databricks.com>
Committed: Tue Nov 1 23:00:28 2016 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/execution/streaming/HDFSMetadataLog.scala | 5 +++++
 .../spark/sql/execution/streaming/HDFSMetadataLogSuite.scala   | 6 ++++++
 2 files changed, 11 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/39d2fdb5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index c7235320..9a0f87c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -148,6 +148,11 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path:
String)
           // It will fail if there is an existing file (someone has committed the batch)
           logDebug(s"Attempting to write log #${batchIdToPath(batchId)}")
           fileManager.rename(tempPath, batchIdToPath(batchId))
+
+          // SPARK-17475: HDFSMetadataLog should not leak CRC files
+          // If the underlying filesystem didn't rename the CRC file, delete it.
+          val crcPath = new Path(tempPath.getParent(), s".${tempPath.getName()}.crc")
+          if (fileManager.exists(crcPath)) fileManager.delete(crcPath)
           return
         } catch {
           case e: IOException if isFileAlreadyExistsException(e) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/39d2fdb5/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
index 9c1d26d..d03e08d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
@@ -119,6 +119,12 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext
{
       assert(metadataLog.get(1).isEmpty)
       assert(metadataLog.get(2).isDefined)
       assert(metadataLog.getLatest().get._1 == 2)
+
+      // There should be exactly one file, called "2", in the metadata directory.
+      // This check also tests for regressions of SPARK-17475
+      val allFiles = new File(metadataLog.metadataPath.toString).listFiles().toSeq
+      assert(allFiles.size == 1)
+      assert(allFiles(0).getName() == "2")
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message