spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lix...@apache.org
Subject spark git commit: [SPARK-21721][SQL] Clear FileSystem deleteOnExit cache when paths are successfully removed
Date Tue, 15 Aug 2017 05:29:35 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 48bacd36c -> d9c8e6223


[SPARK-21721][SQL] Clear FileSystem deleteOnExit cache when paths are successfully removed

## What changes were proposed in this pull request?

We put staging path to delete into the deleteOnExit cache of `FileSystem` in case of the path
can't be successfully removed. But when we successfully remove the path, we don't remove it
from the cache. We should do it to avoid continuing grow the cache size.

## How was this patch tested?

Added a test.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #18934 from viirya/SPARK-21721.

(cherry picked from commit 4c3cf1cc5cdb400ceef447d366e9f395cd87b273)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d9c8e622
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d9c8e622
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d9c8e622

Branch: refs/heads/branch-2.2
Commit: d9c8e6223f6b31bfbca33b1064ead9720cfefa10
Parents: 48bacd3
Author: Liang-Chi Hsieh <viirya@gmail.com>
Authored: Mon Aug 14 22:29:15 2017 -0700
Committer: gatorsmile <gatorsmile@gmail.com>
Committed: Mon Aug 14 22:29:27 2017 -0700

----------------------------------------------------------------------
 .../hive/execution/InsertIntoHiveTable.scala    |  8 ++++++-
 .../sql/hive/execution/SQLQuerySuite.scala      | 22 ++++++++++++++++++--
 2 files changed, 27 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d9c8e622/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 797481c..66ee5d4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -393,7 +393,13 @@ case class InsertIntoHiveTable(
     // Attempt to delete the staging directory and the inclusive files. If failed, the files
are
     // expected to be dropped at the normal termination of VM since deleteOnExit is used.
     try {
-      createdTempDir.foreach { path => path.getFileSystem(hadoopConf).delete(path, true)
}
+      createdTempDir.foreach { path =>
+        val fs = path.getFileSystem(hadoopConf)
+        if (fs.delete(path, true)) {
+          // If we successfully delete the staging directory, remove it from FileSystem's
cache.
+          fs.cancelDeleteOnExit(path)
+        }
+      }
     } catch {
       case NonFatal(e) =>
         logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e)

http://git-wip-us.apache.org/repos/asf/spark/blob/d9c8e622/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index c944f28..002ddd4 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -20,10 +20,10 @@ package org.apache.spark.sql.hive.execution
 import java.io.File
 import java.nio.charset.StandardCharsets
 import java.sql.{Date, Timestamp}
-import java.util.Locale
+import java.util.{Locale, Set}
 
 import com.google.common.io.Files
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.TestUtils
 import org.apache.spark.sql._
@@ -2015,4 +2015,22 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton
{
       checkAnswer(table.filter($"p" === "p1\" and q=\"q1").select($"a"), Row(4))
     }
   }
+
+  test("SPARK-21721: Clear FileSystem deleterOnExit cache if path is successfully removed")
{
+    withTable("test21721") {
+      val deleteOnExitField = classOf[FileSystem].getDeclaredField("deleteOnExit")
+      deleteOnExitField.setAccessible(true)
+
+      val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
+      val setOfPath = deleteOnExitField.get(fs).asInstanceOf[Set[Path]]
+
+      val testData = sparkContext.parallelize(1 to 10).map(i => TestData(i, i.toString)).toDF()
+      sql("CREATE TABLE test21721 (key INT, value STRING)")
+      val pathSizeToDeleteOnExit = setOfPath.size()
+
+      (0 to 10).foreach(_ => testData.write.mode(SaveMode.Append).insertInto("test1"))
+
+      assert(setOfPath.size() == pathSizeToDeleteOnExit)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message