spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lix...@apache.org
Subject spark git commit: [SPARK-21721][SQL][BACKPORT-2.1] Clear FileSystem deleteOnExit cache when paths are successfully removed
Date Tue, 15 Aug 2017 15:48:04 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 9b749b6ce -> 6f366fbbf


[SPARK-21721][SQL][BACKPORT-2.1] Clear FileSystem deleteOnExit cache when paths are successfully
removed

## What changes were proposed in this pull request?

Backport SPARK-21721 to branch 2.1:

We put staging path to delete into the deleteOnExit cache of FileSystem in case of the path
can't be successfully removed. But when we successfully remove the path, we don't remove it
from the cache. We should do it to avoid continuing grow the cache size.

## How was this patch tested?

Added test.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #18947 from viirya/SPARK-21721-backport-2.1.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6f366fbb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6f366fbb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6f366fbb

Branch: refs/heads/branch-2.1
Commit: 6f366fbbf8dc0a00050040891635e1caae8a4faa
Parents: 9b749b6
Author: Liang-Chi Hsieh <viirya@gmail.com>
Authored: Tue Aug 15 08:48:00 2017 -0700
Committer: gatorsmile <gatorsmile@gmail.com>
Committed: Tue Aug 15 08:48:00 2017 -0700

----------------------------------------------------------------------
 .../hive/execution/InsertIntoHiveTable.scala    |  8 +++++++-
 .../sql/hive/execution/SQLQuerySuite.scala      | 21 +++++++++++++++++++-
 2 files changed, 27 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6f366fbb/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 3b9c2fc..3567819 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -382,7 +382,13 @@ case class InsertIntoHiveTable(
     // Attempt to delete the staging directory and the inclusive files. If failed, the files
are
     // expected to be dropped at the normal termination of VM since deleteOnExit is used.
     try {
-      createdTempDir.foreach { path => path.getFileSystem(hadoopConf).delete(path, true)
}
+      createdTempDir.foreach { path =>
+        val fs = path.getFileSystem(hadoopConf)
+        if (fs.delete(path, true)) {
+          // If we successfully delete the staging directory, remove it from FileSystem's
cache.
+          fs.cancelDeleteOnExit(path)
+        }
+      }
     } catch {
       case NonFatal(e) =>
         logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e)

http://git-wip-us.apache.org/repos/asf/spark/blob/6f366fbb/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 1619115..73ceaf8 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -20,12 +20,13 @@ package org.apache.spark.sql.hive.execution
 import java.io.{File, PrintWriter}
 import java.nio.charset.StandardCharsets
 import java.sql.{Date, Timestamp}
+import java.util.Set
 
 import scala.sys.process.{Process, ProcessLogger}
 import scala.util.Try
 
 import com.google.common.io.Files
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
@@ -2031,4 +2032,22 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton
{
       checkAnswer(table.filter($"p" === "p1\" and q=\"q1").select($"a"), Row(4))
     }
   }
+
+  test("SPARK-21721: Clear FileSystem deleterOnExit cache if path is successfully removed")
{
+    withTable("test21721") {
+      val deleteOnExitField = classOf[FileSystem].getDeclaredField("deleteOnExit")
+      deleteOnExitField.setAccessible(true)
+
+      val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
+      val setOfPath = deleteOnExitField.get(fs).asInstanceOf[Set[Path]]
+
+      val testData = sparkContext.parallelize(1 to 10).map(i => TestData(i, i.toString)).toDF()
+      sql("CREATE TABLE test21721 (key INT, value STRING)")
+      val pathSizeToDeleteOnExit = setOfPath.size()
+
+      (0 to 10).foreach(_ => testData.write.mode(SaveMode.Append).insertInto("test1"))
+
+      assert(setOfPath.size() == pathSizeToDeleteOnExit)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message