flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/4] flink git commit: [FLINK-1668] [core] Add a config option to specify delays between restarts
Date Mon, 09 Mar 2015 19:27:19 GMT
Repository: flink
Updated Branches:
  refs/heads/master 0b15bc3c5 -> 0df5601ad


[FLINK-1668] [core] Add a config option to specify delays between restarts


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/abbb0a93
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/abbb0a93
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/abbb0a93

Branch: refs/heads/master
Commit: abbb0a93ca67da17197dc5372e6d95edd8149d44
Parents: 500ddff
Author: Stephan Ewen <sewen@apache.org>
Authored: Mon Mar 9 19:28:54 2015 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Mon Mar 9 19:31:05 2015 +0100

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java      |  8 +++++++-
 .../flink/runtime/jobmanager/JobManager.scala     | 18 +++++++++++++++---
 .../flink/runtime/jobmanager/RecoveryITCase.scala |  1 +
 .../flink/test/misc/AutoParallelismITCase.java    |  2 --
 .../ProcessFailureBatchRecoveryITCase.java        |  1 +
 .../flink/test/recovery/SimpleRecoveryITCase.java |  2 +-
 .../jobmanager/JobManagerFailsITCase.scala        |  1 +
 .../taskmanager/TaskManagerFailsITCase.scala      |  1 +
 8 files changed, 27 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/abbb0a93/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 0f42a17..028c258 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -40,6 +40,12 @@ public final class ConfigConstants {
 	 * value to 0 effectively disables fault tolerance.
 	 */
 	public static final String DEFAULT_EXECUTION_RETRIES_KEY = "execution-retries.default";
+
+	/**
+	 * Config parameter for the delay between execution retries. The value must be specified
in the
+	 * notation "10 s" or "1 min" (style of Scala Finite Durations)
+	 */
+	public static final String DEFAULT_EXECUTION_RETRY_DELAY_KEY = "execution-retries.delay";
 	
 	// -------------------------------- Runtime -------------------------------
 	
@@ -339,7 +345,7 @@ public final class ConfigConstants {
 	public static final String AKKA_ASK_TIMEOUT = "akka.ask.timeout";
 
 	/**
-	 * Timeout for all blocking calls
+	 * Timeout for all blocking calls that look up remote actors
 	 */
 	public static final String AKKA_LOOKUP_TIMEOUT = "akka.lookup.timeout";
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb0a93/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index e3e96e5..7ba06e7 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -850,9 +850,21 @@ object JobManager {
       ConfigConstants.DEFAULT_EXECUTION_RETRIES_KEY,
       ConfigConstants.DEFAULT_EXECUTION_RETRIES)
 
-    val delayBetweenRetries = 2 * Duration(configuration.getString(
-      ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE,
-      ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT)).toMillis
+    // configure the delay between execution retries.
+    // unless explicitly specifies, this is dependent on the heartbeat timeout
+    val pauseString = configuration.getString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE,
+                                              ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT)
+    val delayString = configuration.getString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY,
+                                              pauseString)
+
+    val delayBetweenRetries: Long = try {
+        Duration(delayString).toMillis
+      }
+      catch {
+        case n: NumberFormatException => throw new Exception(
+          s"Invalid config value for ${ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY}:
" +
+            s"${pauseString}. Value must be a valid duration (such as 100 milli or 1 min)");
+      }
 
     val archiveProps: Props = Props(classOf[MemoryArchivist], archiveCount)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb0a93/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
index e7d1d83..c201d08 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
@@ -49,6 +49,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
     config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
     config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManagers)
     config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, heartbeatTimeout)
+    config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, heartbeatTimeout)
     new TestingCluster(config)
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb0a93/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
index ea79a3a..8ddd7bc 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
@@ -56,8 +56,6 @@ public class AutoParallelismITCase {
 		Configuration config = new Configuration();
 		config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, NUM_TM);
 		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, SLOTS_PER_TM);
-		config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2 s");
-
 		cluster = new ForkableFlinkMiniCluster(config, false);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb0a93/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java
index 6866fbc..cceeb47 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java
@@ -125,6 +125,7 @@ public class ProcessFailureBatchRecoveryITCase {
 			jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "500 ms");
 			jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2 s");
 			jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 2);
+			jmConfig.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "4 s");
 
 			jmActorSystem = AkkaUtils.createActorSystem(jmConfig, new Some<Tuple2<String, Object>>(localAddress));
 			ActorRef jmActor = JobManager.startJobManagerActors(jmConfig, jmActorSystem)._1();

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb0a93/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
index df6fbba..48afce1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
@@ -51,7 +51,7 @@ public class SimpleRecoveryITCase {
 		Configuration config = new Configuration();
 		config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 2);
 		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
-		config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2 s");
+		config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "100 ms");
 
 		cluster = new ForkableFlinkMiniCluster(config, false);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb0a93/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
index f9b1b4c..625ca07 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
@@ -137,6 +137,7 @@ with WordSpecLike with Matchers with BeforeAndAfterAll {
     config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskmanagers)
     config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms")
     config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "4000 ms")
+    config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "8000 ms")
     config.setDouble(ConfigConstants.AKKA_WATCH_THRESHOLD, 5)
 
     new ForkableFlinkMiniCluster(config, singleActorSystem = false)

http://git-wip-us.apache.org/repos/asf/flink/blob/abbb0a93/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
index 245bcd9..659262c 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
@@ -232,6 +232,7 @@ with WordSpecLike with Matchers with BeforeAndAfterAll {
     config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskmanagers)
     config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms")
     config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "4000 ms")
+    config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "4000 ms")
     config.setDouble(ConfigConstants.AKKA_WATCH_THRESHOLD, 5)
 
     new ForkableFlinkMiniCluster(config, singleActorSystem = false)


Mime
View raw message