spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tgra...@apache.org
Subject spark git commit: SPARK-3779. yarn spark.yarn.applicationMaster.waitTries config should be...
Date Thu, 18 Dec 2014 18:19:21 GMT
Repository: spark
Updated Branches:
  refs/heads/master 3b764699f -> 253b72b56


SPARK-3779. yarn spark.yarn.applicationMaster.waitTries config should be...

... changed to a time period

Author: Sandy Ryza <sandy@cloudera.com>

Closes #3471 from sryza/sandy-spark-3779 and squashes the following commits:

20b9887 [Sandy Ryza] Deprecate old property
42b5df7 [Sandy Ryza] Review feedback
9a959a1 [Sandy Ryza] SPARK-3779. yarn spark.yarn.applicationMaster.waitTries config should
be changed to a time period


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/253b72b5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/253b72b5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/253b72b5

Branch: refs/heads/master
Commit: 253b72b56fe908bbab5d621eae8a5f359c639dfd
Parents: 3b76469
Author: Sandy Ryza <sandy@cloudera.com>
Authored: Thu Dec 18 12:19:07 2014 -0600
Committer: Thomas Graves <tgraves@apache.org>
Committed: Thu Dec 18 12:19:07 2014 -0600

----------------------------------------------------------------------
 docs/running-on-yarn.md                         |  8 ++--
 .../spark/deploy/yarn/ApplicationMaster.scala   | 48 ++++++++++----------
 2 files changed, 29 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/253b72b5/docs/running-on-yarn.md
----------------------------------------------------------------------
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index b5fb077..86276b1 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -22,10 +22,12 @@ Most of the configs are the same for Spark on YARN as for other deployment
modes
 <table class="table">
 <tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
 <tr>
-  <td><code>spark.yarn.applicationMaster.waitTries</code></td>
-  <td>10</td>
+  <td><code>spark.yarn.am.waitTime</code></td>
+  <td>100000</td>
   <td>
-    Set the number of times the ApplicationMaster waits for the the Spark master and then
also the number of tries it waits for the SparkContext to be initialized
+    In yarn-cluster mode, time in milliseconds for the application master to wait for the
+    SparkContext to be initialized. In yarn-client mode, time for the application master
to wait
+    for the driver to connect to it.
   </td>
 </tr>
 <tr>

http://git-wip-us.apache.org/repos/asf/spark/blob/253b72b5/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 987b337..dc7a078 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -329,43 +329,43 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
 
   private def waitForSparkContextInitialized(): SparkContext = {
     logInfo("Waiting for spark context initialization")
-    try {
-      sparkContextRef.synchronized {
-        var count = 0
-        val waitTime = 10000L
-        val numTries = sparkConf.getInt("spark.yarn.applicationMaster.waitTries", 10)
-        while (sparkContextRef.get() == null && count < numTries && !finished)
{
-          logInfo("Waiting for spark context initialization ... " + count)
-          count = count + 1
-          sparkContextRef.wait(waitTime)
-        }
+    sparkContextRef.synchronized {
+      val waitTries = sparkConf.getOption("spark.yarn.applicationMaster.waitTries")
+        .map(_.toLong * 10000L)
+      if (waitTries.isDefined) {
+        logWarning(
+          "spark.yarn.applicationMaster.waitTries is deprecated, use spark.yarn.am.waitTime")
+      }
+      val totalWaitTime = sparkConf.getLong("spark.yarn.am.waitTime", waitTries.getOrElse(100000L))
+      val deadline = System.currentTimeMillis() + totalWaitTime
 
-        val sparkContext = sparkContextRef.get()
-        if (sparkContext == null) {
-          logError(("SparkContext did not initialize after waiting for %d ms. Please check
earlier"
-            + " log output for errors. Failing the application.").format(numTries * waitTime))
-        }
-        sparkContext
+      while (sparkContextRef.get() == null && System.currentTimeMillis < deadline
&& !finished) {
+        logInfo("Waiting for spark context initialization ... ")
+        sparkContextRef.wait(10000L)
+      }
+
+      val sparkContext = sparkContextRef.get()
+      if (sparkContext == null) {
+        logError(("SparkContext did not initialize after waiting for %d ms. Please check
earlier"
+          + " log output for errors. Failing the application.").format(totalWaitTime))
       }
+      sparkContext
     }
   }
 
   private def waitForSparkDriver(): ActorRef = {
     logInfo("Waiting for Spark driver to be reachable.")
     var driverUp = false
-    var count = 0
     val hostport = args.userArgs(0)
     val (driverHost, driverPort) = Utils.parseHostPort(hostport)
 
-    // spark driver should already be up since it launched us, but we don't want to
+    // Spark driver should already be up since it launched us, but we don't want to
     // wait forever, so wait 100 seconds max to match the cluster mode setting.
-    // Leave this config unpublished for now. SPARK-3779 to investigating changing
-    // this config to be time based.
-    val numTries = sparkConf.getInt("spark.yarn.applicationMaster.waitTries", 1000)
+    val totalWaitTime = sparkConf.getLong("spark.yarn.am.waitTime", 100000L)
+    val deadline = System.currentTimeMillis + totalWaitTime
 
-    while (!driverUp && !finished && count < numTries) {
+    while (!driverUp && !finished && System.currentTimeMillis < deadline)
{
       try {
-        count = count + 1
         val socket = new Socket(driverHost, driverPort)
         socket.close()
         logInfo("Driver now available: %s:%s".format(driverHost, driverPort))
@@ -374,7 +374,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
         case e: Exception =>
           logError("Failed to connect to driver at %s:%s, retrying ...".
             format(driverHost, driverPort))
-          Thread.sleep(100)
+          Thread.sleep(100L)
       }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message