spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zsxw...@apache.org
Subject spark git commit: [SPARK-11999][CORE] Fix the issue that ThreadUtils.newDaemonCachedThreadPool doesn't cache any task
Date Thu, 26 Nov 2015 07:35:23 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 b1fcefca6 -> 7900d192e


[SPARK-11999][CORE] Fix the issue that ThreadUtils.newDaemonCachedThreadPool doesn't cache
any task

In the previous codes, `newDaemonCachedThreadPool` uses `SynchronousQueue`, which is wrong.
`SynchronousQueue` is an empty queue that cannot cache any task. This patch uses `LinkedBlockingQueue`
to fix it along with other fixes to make sure `newDaemonCachedThreadPool` can use at most
`maxThreadNumber` threads, and after that, cache tasks to `LinkedBlockingQueue`.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #9978 from zsxwing/cached-threadpool.

(cherry picked from commit d3ef693325f91a1ed340c9756c81244a80398eb2)
Signed-off-by: Shixiong Zhu <shixiong@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7900d192
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7900d192
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7900d192

Branch: refs/heads/branch-1.5
Commit: 7900d192e8adf501fbed0d0704d60d2c0e63a764
Parents: b1fcefc
Author: Shixiong Zhu <shixiong@databricks.com>
Authored: Wed Nov 25 23:31:21 2015 -0800
Committer: Shixiong Zhu <shixiong@databricks.com>
Committed: Wed Nov 25 23:31:53 2015 -0800

----------------------------------------------------------------------
 .../org/apache/spark/util/ThreadUtils.scala     | 14 ++++--
 .../apache/spark/util/ThreadUtilsSuite.scala    | 45 ++++++++++++++++++++
 2 files changed, 56 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7900d192/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
index 06976f8..3159ef7 100644
--- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
@@ -57,10 +57,18 @@ private[spark] object ThreadUtils {
    * Create a cached thread pool whose max number of threads is `maxThreadNumber`. Thread
names
    * are formatted as prefix-ID, where ID is a unique, sequentially assigned integer.
    */
-  def newDaemonCachedThreadPool(prefix: String, maxThreadNumber: Int): ThreadPoolExecutor
= {
+  def newDaemonCachedThreadPool(
+      prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): ThreadPoolExecutor
= {
     val threadFactory = namedThreadFactory(prefix)
-    new ThreadPoolExecutor(
-      0, maxThreadNumber, 60L, TimeUnit.SECONDS, new SynchronousQueue[Runnable], threadFactory)
+    val threadPool = new ThreadPoolExecutor(
+      maxThreadNumber, // corePoolSize: the max number of threads to create before queuing
the tasks
+      maxThreadNumber, // maximumPoolSize: because we use LinkedBlockingDeque, this one is
not used
+      keepAliveSeconds,
+      TimeUnit.SECONDS,
+      new LinkedBlockingQueue[Runnable],
+      threadFactory)
+    threadPool.allowCoreThreadTimeOut(true)
+    threadPool
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/7900d192/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
index 620e4de..92ae038 100644
--- a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
@@ -24,6 +24,8 @@ import scala.concurrent.duration._
 import scala.concurrent.{Await, Future}
 import scala.util.Random
 
+import org.scalatest.concurrent.Eventually._
+
 import org.apache.spark.SparkFunSuite
 
 class ThreadUtilsSuite extends SparkFunSuite {
@@ -59,6 +61,49 @@ class ThreadUtilsSuite extends SparkFunSuite {
     }
   }
 
+  test("newDaemonCachedThreadPool") {
+    val maxThreadNumber = 10
+    val startThreadsLatch = new CountDownLatch(maxThreadNumber)
+    val latch = new CountDownLatch(1)
+    val cachedThreadPool = ThreadUtils.newDaemonCachedThreadPool(
+      "ThreadUtilsSuite-newDaemonCachedThreadPool",
+      maxThreadNumber,
+      keepAliveSeconds = 2)
+    try {
+      for (_ <- 1 to maxThreadNumber) {
+        cachedThreadPool.execute(new Runnable {
+          override def run(): Unit = {
+            startThreadsLatch.countDown()
+            latch.await(10, TimeUnit.SECONDS)
+          }
+        })
+      }
+      startThreadsLatch.await(10, TimeUnit.SECONDS)
+      assert(cachedThreadPool.getActiveCount === maxThreadNumber)
+      assert(cachedThreadPool.getQueue.size === 0)
+
+      // Submit a new task and it should be put into the queue since the thread number reaches
the
+      // limitation
+      cachedThreadPool.execute(new Runnable {
+        override def run(): Unit = {
+          latch.await(10, TimeUnit.SECONDS)
+        }
+      })
+
+      assert(cachedThreadPool.getActiveCount === maxThreadNumber)
+      assert(cachedThreadPool.getQueue.size === 1)
+
+      latch.countDown()
+      eventually(timeout(10.seconds)) {
+        // All threads should be stopped after keepAliveSeconds
+        assert(cachedThreadPool.getActiveCount === 0)
+        assert(cachedThreadPool.getPoolSize === 0)
+      }
+    } finally {
+      cachedThreadPool.shutdownNow()
+    }
+  }
+
   test("sameThread") {
     val callerThreadName = Thread.currentThread().getName()
     val f = Future {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message