flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [3/9] flink git commit: [FLINK-3361] [jobmanager] Fix error messages about execution delay and max heartbeat pause
Date Mon, 08 Feb 2016 23:06:07 GMT
[FLINK-3361] [jobmanager] Fix error messages about execution delay and max heartbeat pause


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/af3e6890
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/af3e6890
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/af3e6890

Branch: refs/heads/master
Commit: af3e689010f3d8e08b6ff70dd5eb6d45429d4981
Parents: 4cc4f60
Author: Stephan Ewen <sewen@apache.org>
Authored: Mon Feb 8 11:46:35 2016 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Mon Feb 8 16:57:57 2016 +0100

----------------------------------------------------------------------
 .../flink/runtime/jobmanager/JobManager.scala   | 28 +++++++++++++-------
 1 file changed, 19 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/af3e6890/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 4410ec3..bc7c134 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -25,8 +25,10 @@ import java.util.UUID
 import akka.actor.Status.Failure
 import akka.actor._
 import akka.pattern.ask
+
 import grizzled.slf4j.Logger
-import org.apache.flink.api.common.{ApplicationID, ExecutionConfig, JobID}
+
+import org.apache.flink.api.common.{ExecutionConfig, JobID}
 import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration}
 import org.apache.flink.core.io.InputSplitAssigner
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
@@ -61,6 +63,7 @@ import org.apache.flink.runtime.util._
 import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
 import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages}
 import org.apache.flink.util.{ExceptionUtils, InstantiationUtil, NetUtils}
+
 import org.jboss.netty.channel.ChannelException
 
 import scala.annotation.tailrec
@@ -1712,9 +1715,9 @@ object JobManager {
       maxSleepBetweenRetries : Long = 0 )
     : scala.util.Try[T] = {
 
-    def sleepBeforeRetry : Unit = {
+    def sleepBeforeRetry() : Unit = {
       if (maxSleepBetweenRetries > 0) {
-        val sleepTime = ((Math.random() * maxSleepBetweenRetries).asInstanceOf[Long])
+        val sleepTime = (Math.random() * maxSleepBetweenRetries).asInstanceOf[Long]
         LOG.info(s"Retrying after bind exception. Sleeping for ${sleepTime} ms.")
         Thread.sleep(sleepTime)
       }
@@ -1728,7 +1731,7 @@ object JobManager {
           scala.util.Failure(new RuntimeException(
             "Unable to do further retries starting the actor system"))
         } else {
-          sleepBeforeRetry
+          sleepBeforeRetry()
           retryOnBindException(fn, stopCond)
         }
       case scala.util.Failure(x: Exception) => x.getCause match {
@@ -1737,7 +1740,7 @@ object JobManager {
             scala.util.Failure(new RuntimeException(
               "Unable to do further retries starting the actor system"))
           } else {
-            sleepBeforeRetry
+            sleepBeforeRetry()
             retryOnBindException(fn, stopCond)
           }
         case _ => scala.util.Failure(x)
@@ -2044,14 +2047,21 @@ object JobManager {
                                               ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT)
     val delayString = configuration.getString(ConfigConstants.EXECUTION_RETRY_DELAY_KEY,
                                               pauseString)
-
+    
     val delayBetweenRetries: Long = try {
         Duration(delayString).toMillis
       }
       catch {
-        case n: NumberFormatException => throw new Exception(
-          s"Invalid config value for ${ConfigConstants.EXECUTION_RETRY_DELAY_KEY}: " +
-            s"$pauseString. Value must be a valid duration (such as 100 milli or 1 min)");
+        case n: NumberFormatException =>
+          if (delayString.equals(pauseString)) {
+            throw new Exception(
+              s"Invalid config value for ${ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE}: "
+
+                s"$pauseString. Value must be a valid duration (such as '10 s' or '1 min')")
+          } else {
+            throw new Exception(
+              s"Invalid config value for ${ConfigConstants.EXECUTION_RETRY_DELAY_KEY}: "
+
+                s"$delayString. Value must be a valid duration (such as '100 milli' or '10
s')")
+          }
       }
 
     val executionContext = ExecutionContext.fromExecutor(new ForkJoinPool())


Mime
View raw message