spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tgra...@apache.org
Subject spark git commit: [SPARK-17511] Yarn Dynamic Allocation: Avoid marking released container as Failed
Date Wed, 14 Sep 2016 19:34:42 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 6fe5972e6 -> fab77dadf


[SPARK-17511] Yarn Dynamic Allocation: Avoid marking released container as Failed

Due to race conditions, the ` assert(numExecutorsRunning <= targetNumExecutors)` can fail
causing `AssertionError`. So removed the assertion, instead moved the conditional check before
launching new container:
```
java.lang.AssertionError: assertion failed
        at scala.Predef$.assert(Predef.scala:156)
        at org.apache.spark.deploy.yarn.YarnAllocator$$anonfun$runAllocatedContainers$1.org$apache$spark$deploy$yarn$YarnAllocator$$anonfun$$updateInternalState$1(YarnAllocator.scala:489)
        at org.apache.spark.deploy.yarn.YarnAllocator$$anonfun$runAllocatedContainers$1$$anon$1.run(YarnAllocator.scala:519)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
```
This was manually tested using a large ForkAndJoin job with Dynamic Allocation enabled to
validate the failing job succeeds, without any such exception.

Author: Kishor Patil <kpatil@yahoo-inc.com>

Closes #15069 from kishorvpatil/SPARK-17511.

(cherry picked from commit ff6e4cbdc80e2ad84c5d70ee07f323fad9374e3e)
Signed-off-by: Tom Graves <tgraves@yahoo-inc.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fab77dad
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fab77dad
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fab77dad

Branch: refs/heads/branch-2.0
Commit: fab77dadf70d011cec8976acfe8c851816f82426
Parents: 6fe5972
Author: Kishor Patil <kpatil@yahoo-inc.com>
Authored: Wed Sep 14 14:19:35 2016 -0500
Committer: Tom Graves <tgraves@yahoo-inc.com>
Committed: Wed Sep 14 14:33:40 2016 -0500

----------------------------------------------------------------------
 .../spark/deploy/yarn/YarnAllocator.scala       | 68 +++++++++++---------
 .../spark/deploy/yarn/YarnAllocatorSuite.scala  | 19 ++++++
 2 files changed, 55 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fab77dad/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 1b80071..b321901 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -483,7 +483,6 @@ private[yarn] class YarnAllocator(
 
       def updateInternalState(): Unit = synchronized {
         numExecutorsRunning += 1
-        assert(numExecutorsRunning <= targetNumExecutors)
         executorIdToContainer(executorId) = container
         containerIdToExecutorId(container.getId) = executorId
 
@@ -493,39 +492,44 @@ private[yarn] class YarnAllocator(
         allocatedContainerToHostMap.put(containerId, executorHostname)
       }
 
-      if (launchContainers) {
-        logInfo("Launching ExecutorRunnable. driverUrl: %s,  executorHostname: %s".format(
-          driverUrl, executorHostname))
-
-        launcherPool.execute(new Runnable {
-          override def run(): Unit = {
-            try {
-              new ExecutorRunnable(
-                container,
-                conf,
-                sparkConf,
-                driverUrl,
-                executorId,
-                executorHostname,
-                executorMemory,
-                executorCores,
-                appAttemptId.getApplicationId.toString,
-                securityMgr,
-                localResources
-              ).run()
-              updateInternalState()
-            } catch {
-              case NonFatal(e) =>
-                logError(s"Failed to launch executor $executorId on container $containerId",
e)
-                // Assigned container should be released immediately to avoid unnecessary
resource
-                // occupation.
-                amClient.releaseAssignedContainer(containerId)
+      if (numExecutorsRunning < targetNumExecutors) {
+        if (launchContainers) {
+          logInfo("Launching ExecutorRunnable. driverUrl: %s,  executorHostname: %s".format(
+            driverUrl, executorHostname))
+
+          launcherPool.execute(new Runnable {
+            override def run(): Unit = {
+              try {
+                new ExecutorRunnable(
+                  container,
+                  conf,
+                  sparkConf,
+                  driverUrl,
+                  executorId,
+                  executorHostname,
+                  executorMemory,
+                  executorCores,
+                  appAttemptId.getApplicationId.toString,
+                  securityMgr,
+                  localResources
+                ).run()
+                updateInternalState()
+              } catch {
+                case NonFatal(e) =>
+                  logError(s"Failed to launch executor $executorId on container $containerId",
e)
+                  // Assigned container should be released immediately to avoid unnecessary
resource
+                  // occupation.
+                  amClient.releaseAssignedContainer(containerId)
+              }
             }
-          }
-        })
+          })
+        } else {
+          // For test only
+          updateInternalState()
+        }
       } else {
-        // For test only
-        updateInternalState()
+        logInfo(("Skip launching executorRunnable as runnning Excecutors count: %d " +
+          "reached target Executors count: %d.").format(numExecutorsRunning, targetNumExecutors))
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/fab77dad/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
----------------------------------------------------------------------
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 207dbf5..f8351c0 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -136,6 +136,25 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
     size should be (0)
   }
 
+  test("container should not be created if requested number if met") {
+    // request a single container and receive it
+    val handler = createAllocator(1)
+    handler.updateResourceRequests()
+    handler.getNumExecutorsRunning should be (0)
+    handler.getPendingAllocate.size should be (1)
+
+    val container = createContainer("host1")
+    handler.handleAllocatedContainers(Array(container))
+
+    handler.getNumExecutorsRunning should be (1)
+    handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1")
+    handler.allocatedHostToContainersMap.get("host1").get should contain (container.getId)
+
+    val container2 = createContainer("host2")
+    handler.handleAllocatedContainers(Array(container2))
+    handler.getNumExecutorsRunning should be (1)
+  }
+
   test("some containers allocated") {
     // request a few containers and receive some of them
     val handler = createAllocator(4)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message