Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id AD5FC18D64 for ; Mon, 8 Feb 2016 23:06:08 +0000 (UTC) Received: (qmail 34145 invoked by uid 500); 8 Feb 2016 23:06:05 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 33928 invoked by uid 500); 8 Feb 2016 23:06:05 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 33866 invoked by uid 99); 8 Feb 2016 23:06:05 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 08 Feb 2016 23:06:05 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4A0FFE56E3; Mon, 8 Feb 2016 23:06:05 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sewen@apache.org To: commits@flink.apache.org Date: Mon, 08 Feb 2016 23:06:07 -0000 Message-Id: <9cee4890e2654b76b8f5a45eccc4beb2@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [3/9] flink git commit: [FLINK-3361] [jobmanager] Fix error messages about execution delay and max heartbeat pause [FLINK-3361] [jobmanager] Fix error messages about execution delay and max heartbeat pause Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/af3e6890 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/af3e6890 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/af3e6890 Branch: refs/heads/master Commit: af3e689010f3d8e08b6ff70dd5eb6d45429d4981 Parents: 4cc4f60 Author: Stephan Ewen Authored: Mon Feb 8 11:46:35 2016 +0100 Committer: Stephan Ewen Committed: Mon Feb 8 16:57:57 2016 +0100 ---------------------------------------------------------------------- .../flink/runtime/jobmanager/JobManager.scala | 28 +++++++++++++------- 1 file changed, 19 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/af3e6890/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 4410ec3..bc7c134 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -25,8 +25,10 @@ import java.util.UUID import akka.actor.Status.Failure import akka.actor._ import akka.pattern.ask + import grizzled.slf4j.Logger -import org.apache.flink.api.common.{ApplicationID, ExecutionConfig, JobID} + +import org.apache.flink.api.common.{ExecutionConfig, JobID} import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration} import org.apache.flink.core.io.InputSplitAssigner import org.apache.flink.runtime.accumulators.AccumulatorSnapshot @@ -61,6 +63,7 @@ import org.apache.flink.runtime.util._ import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils} import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages} import org.apache.flink.util.{ExceptionUtils, InstantiationUtil, NetUtils} + import org.jboss.netty.channel.ChannelException import scala.annotation.tailrec @@ -1712,9 +1715,9 @@ object JobManager { maxSleepBetweenRetries : Long = 0 ) : scala.util.Try[T] = { - def sleepBeforeRetry : Unit = { + def sleepBeforeRetry() : Unit = { if (maxSleepBetweenRetries > 0) { - val sleepTime = ((Math.random() * maxSleepBetweenRetries).asInstanceOf[Long]) + val sleepTime = (Math.random() * maxSleepBetweenRetries).asInstanceOf[Long] LOG.info(s"Retrying after bind exception. Sleeping for ${sleepTime} ms.") Thread.sleep(sleepTime) } @@ -1728,7 +1731,7 @@ object JobManager { scala.util.Failure(new RuntimeException( "Unable to do further retries starting the actor system")) } else { - sleepBeforeRetry + sleepBeforeRetry() retryOnBindException(fn, stopCond) } case scala.util.Failure(x: Exception) => x.getCause match { @@ -1737,7 +1740,7 @@ object JobManager { scala.util.Failure(new RuntimeException( "Unable to do further retries starting the actor system")) } else { - sleepBeforeRetry + sleepBeforeRetry() retryOnBindException(fn, stopCond) } case _ => scala.util.Failure(x) @@ -2044,14 +2047,21 @@ object JobManager { ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT) val delayString = configuration.getString(ConfigConstants.EXECUTION_RETRY_DELAY_KEY, pauseString) - + val delayBetweenRetries: Long = try { Duration(delayString).toMillis } catch { - case n: NumberFormatException => throw new Exception( - s"Invalid config value for ${ConfigConstants.EXECUTION_RETRY_DELAY_KEY}: " + - s"$pauseString. Value must be a valid duration (such as 100 milli or 1 min)"); + case n: NumberFormatException => + if (delayString.equals(pauseString)) { + throw new Exception( + s"Invalid config value for ${ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE}: " + + s"$pauseString. Value must be a valid duration (such as '10 s' or '1 min')") + } else { + throw new Exception( + s"Invalid config value for ${ConfigConstants.EXECUTION_RETRY_DELAY_KEY}: " + + s"$delayString. Value must be a valid duration (such as '100 milli' or '10 s')") + } } val executionContext = ExecutionContext.fromExecutor(new ForkJoinPool())