spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sro...@apache.org
Subject spark git commit: [SPARK-20920][SQL] ForkJoinPool pools are leaked when writing hive tables with many partitions
Date Tue, 13 Jun 2017 09:48:19 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 dae1a9875 -> 24836be54


[SPARK-20920][SQL] ForkJoinPool pools are leaked when writing hive tables with many partitions

## What changes were proposed in this pull request?

Don't leave thread pool running from AlterTableRecoverPartitionsCommand DDL command

## How was this patch tested?

Existing tests.

Author: Sean Owen <sowen@cloudera.com>

Closes #18216 from srowen/SPARK-20920.

(cherry picked from commit 7b7c85ede398996aafffb126440e5f0c67f67210)
Signed-off-by: Sean Owen <sowen@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/24836be5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/24836be5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/24836be5

Branch: refs/heads/branch-2.2
Commit: 24836be5450ef741beff4967c8adcc50bea25f41
Parents: dae1a98
Author: Sean Owen <sowen@cloudera.com>
Authored: Tue Jun 13 10:48:07 2017 +0100
Committer: Sean Owen <sowen@cloudera.com>
Committed: Tue Jun 13 10:48:15 2017 +0100

----------------------------------------------------------------------
 .../spark/sql/execution/command/ddl.scala       | 21 ++++++++++++--------
 1 file changed, 13 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/24836be5/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 5554056..b1eaecb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -21,7 +21,6 @@ import java.util.Locale
 
 import scala.collection.{GenMap, GenSeq}
 import scala.collection.parallel.ForkJoinTaskSupport
-import scala.concurrent.forkjoin.ForkJoinPool
 import scala.util.control.NonFatal
 
 import org.apache.hadoop.conf.Configuration
@@ -36,7 +35,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
 import org.apache.spark.sql.execution.datasources.PartitioningUtils
 import org.apache.spark.sql.types._
-import org.apache.spark.util.SerializableConfiguration
+import org.apache.spark.util.{SerializableConfiguration, ThreadUtils}
 
 // Note: The definition of these commands are based on the ones described in
 // https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
@@ -582,8 +581,15 @@ case class AlterTableRecoverPartitionsCommand(
     val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", "10").toInt
     val hadoopConf = spark.sparkContext.hadoopConfiguration
     val pathFilter = getPathFilter(hadoopConf)
-    val partitionSpecsAndLocs = scanPartitions(spark, fs, pathFilter, root, Map(),
-      table.partitionColumnNames, threshold, spark.sessionState.conf.resolver)
+
+    val evalPool = ThreadUtils.newForkJoinPool("AlterTableRecoverPartitionsCommand", 8)
+    val partitionSpecsAndLocs: Seq[(TablePartitionSpec, Path)] =
+      try {
+        scanPartitions(spark, fs, pathFilter, root, Map(), table.partitionColumnNames, threshold,
+          spark.sessionState.conf.resolver, new ForkJoinTaskSupport(evalPool)).seq
+      } finally {
+        evalPool.shutdown()
+      }
     val total = partitionSpecsAndLocs.length
     logInfo(s"Found $total partitions in $root")
 
@@ -604,8 +610,6 @@ case class AlterTableRecoverPartitionsCommand(
     Seq.empty[Row]
   }
 
-  @transient private lazy val evalTaskSupport = new ForkJoinTaskSupport(new ForkJoinPool(8))
-
   private def scanPartitions(
       spark: SparkSession,
       fs: FileSystem,
@@ -614,7 +618,8 @@ case class AlterTableRecoverPartitionsCommand(
       spec: TablePartitionSpec,
       partitionNames: Seq[String],
       threshold: Int,
-      resolver: Resolver): GenSeq[(TablePartitionSpec, Path)] = {
+      resolver: Resolver,
+      evalTaskSupport: ForkJoinTaskSupport): GenSeq[(TablePartitionSpec, Path)] = {
     if (partitionNames.isEmpty) {
       return Seq(spec -> path)
     }
@@ -638,7 +643,7 @@ case class AlterTableRecoverPartitionsCommand(
         val value = ExternalCatalogUtils.unescapePathName(ps(1))
         if (resolver(columnName, partitionNames.head)) {
           scanPartitions(spark, fs, filter, st.getPath, spec ++ Map(partitionNames.head ->
value),
-            partitionNames.drop(1), threshold, resolver)
+            partitionNames.drop(1), threshold, resolver, evalTaskSupport)
         } else {
           logWarning(
             s"expected partition column ${partitionNames.head}, but got ${ps(0)}, ignoring
it")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message