flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [2/6] flink git commit: [FLINK-3350] [tests] Increase default test Akka ask and ZooKeeper timeouts
Date Wed, 10 Feb 2016 16:52:53 GMT
[FLINK-3350] [tests] Increase default test Akka ask and ZooKeeper timeouts


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b8f40251
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b8f40251
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b8f40251

Branch: refs/heads/master
Commit: b8f40251c6c45379118254c21b0d553c2d3b8937
Parents: 9173825
Author: Ufuk Celebi <uce@apache.org>
Authored: Mon Feb 8 14:24:43 2016 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Wed Feb 10 15:26:43 2016 +0100

----------------------------------------------------------------------
 .../runtime/minicluster/FlinkMiniCluster.scala  | 20 ++++++++++++++++++--
 .../minicluster/LocalFlinkMiniCluster.scala     |  2 ++
 .../runtime/testutils/ZooKeeperTestUtils.java   |  5 +++--
 .../runtime/testingUtils/TestingCluster.scala   |  2 ++
 .../kafka/KafkaTestEnvironmentImpl.java         |  7 ++++---
 .../kafka/KafkaTestEnvironmentImpl.java         |  7 ++++---
 ...ctTaskManagerProcessFailureRecoveryTest.java |  3 +++
 .../JobManagerCheckpointRecoveryITCase.java     |  8 ++++++--
 .../recovery/ProcessFailureCancelingITCase.java |  2 +-
 9 files changed, 43 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b8f40251/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 4cdda3f..0346d6d 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -42,7 +42,7 @@ import org.apache.flink.runtime.webmonitor.{WebMonitorUtils, WebMonitor}
 
 import org.slf4j.LoggerFactory
 
-import scala.concurrent.duration.FiniteDuration
+import scala.concurrent.duration.{Duration, FiniteDuration}
 import scala.concurrent._
 import scala.concurrent.forkjoin.ForkJoinPool
 
@@ -86,7 +86,7 @@ abstract class FlinkMiniCluster(
   
   implicit val executionContext = ExecutionContext.global
 
-  implicit val timeout = AkkaUtils.getTimeout(userConfiguration)
+  implicit val timeout = AkkaUtils.getTimeout(configuration)
 
   val recoveryMode = RecoveryMode.fromConfig(configuration)
 
@@ -188,6 +188,22 @@ abstract class FlinkMiniCluster(
     AkkaUtils.getAkkaConfig(configuration, Some((hostname, resolvedPort)))
   }
 
+  /**
+    * Sets CI environment (Travis) specific config defaults.
+    */
+  def setDefaultCiConfig(config: Configuration) : Unit = {
+    // https://docs.travis-ci.com/user/environment-variables#Default-Environment-Variables
+    if (sys.env.contains("CI")) {
+      // Only set if nothing specified in config
+      if (config.getString(ConfigConstants.AKKA_ASK_TIMEOUT, null) == null) {
+        val duration = Duration(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT) * 10
+        config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, s"${duration.toSeconds}s")
+
+        LOG.info(s"Akka ask timeout set to ${duration.toSeconds}s")
+      }
+    }
+  }
+
   // --------------------------------------------------------------------------
   //                          Start/Stop Methods
   // --------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/b8f40251/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index 913aec0..c803429 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -48,6 +48,8 @@ class LocalFlinkMiniCluster(
   override def generateConfiguration(userConfiguration: Configuration): Configuration = {
     val config = getDefaultConfig
 
+    setDefaultCiConfig(config)
+
     config.addAll(userConfiguration)
     setMemory(config)
     initializeIOFormatClasses(config)

http://git-wip-us.apache.org/repos/asf/flink/blob/b8f40251/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
index 6c33835..75569ec 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
@@ -70,9 +70,9 @@ public class ZooKeeperTestUtils {
 		config.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, zooKeeperQuorum);
 
 		int connTimeout = 5000;
-		if (System.getenv().get("CI") != null) {
+		if (System.getenv().containsKey("CI")) {
 			// The regular timeout is to aggressive for Travis and connections are often lost.
-			connTimeout = 20000;
+			connTimeout = 30000;
 		}
 
 		config.setInteger(ConfigConstants.ZOOKEEPER_CONNECTION_TIMEOUT, connTimeout);
@@ -87,6 +87,7 @@ public class ZooKeeperTestUtils {
 		config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms");
 		config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "6 s");
 		config.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 9);
+		config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "100 s");
 		config.setString(ConfigConstants.EXECUTION_RETRY_DELAY_KEY, "10 s");
 
 		return config;

http://git-wip-us.apache.org/repos/asf/flink/blob/b8f40251/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index 5eee4e5..bd56040 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -63,6 +63,8 @@ class TestingCluster(
     cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 10)
     cfg.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, -1)
 
+    setDefaultCiConfig(cfg)
+
     cfg.addAll(userConfig)
     cfg
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/b8f40251/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 6f56ede..3d9e4f5 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -176,8 +176,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		standardProps.setProperty("bootstrap.servers", brokerConnectionString);
 		standardProps.setProperty("group.id", "flink-tests");
 		standardProps.setProperty("auto.commit.enable", "false");
-		standardProps.setProperty("zookeeper.session.timeout.ms", "12000"); // 6 seconds is default.
Seems to be too small for travis.
-		standardProps.setProperty("zookeeper.connection.timeout.ms", "20000");
+		standardProps.setProperty("zookeeper.session.timeout.ms", "30000"); // 6 seconds is default.
Seems to be too small for travis.
+		standardProps.setProperty("zookeeper.connection.timeout.ms", "30000");
 		standardProps.setProperty("auto.offset.reset", "smallest"); // read from the beginning.
(smallest is kafka 0.8)
 		standardProps.setProperty("fetch.message.max.bytes", "256"); // make a lot of fetches (MESSAGES
MUST BE SMALLER!)
 	}
@@ -292,7 +292,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		kafkaProperties.put("replica.fetch.max.bytes", String.valueOf(50 * 1024 * 1024));
 
 		// for CI stability, increase zookeeper session timeout
-		kafkaProperties.put("zookeeper.session.timeout.ms", "20000");
+		kafkaProperties.put("zookeeper.session.timeout.ms", "30000");
+		kafkaProperties.put("zookeeper.connection.timeout.ms", "30000");
 
 		final int numTries = 5;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b8f40251/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 50dcab8..9dfd021 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -175,8 +175,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		standardProps.setProperty("bootstrap.servers", brokerConnectionString);
 		standardProps.setProperty("group.id", "flink-tests");
 		standardProps.setProperty("auto.commit.enable", "false");
-		standardProps.setProperty("zookeeper.session.timeout.ms", "12000"); // 6 seconds is default.
Seems to be too small for travis.
-		standardProps.setProperty("zookeeper.connection.timeout.ms", "20000");
+		standardProps.setProperty("zookeeper.session.timeout.ms", "30000"); // 6 seconds is default.
Seems to be too small for travis.
+		standardProps.setProperty("zookeeper.connection.timeout.ms", "30000");
 		standardProps.setProperty("auto.offset.reset", "earliest"); // read from the beginning.
(earliest is kafka 0.9 value)
 		standardProps.setProperty("fetch.message.max.bytes", "256"); // make a lot of fetches (MESSAGES
MUST BE SMALLER!)
 	}
@@ -296,7 +296,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		kafkaProperties.put("replica.fetch.max.bytes", String.valueOf(50 * 1024 * 1024));
 
 		// for CI stability, increase zookeeper session timeout
-		kafkaProperties.put("zookeeper.session.timeout.ms", "20000");
+		kafkaProperties.put("zookeeper.session.timeout.ms", "30000");
+		kafkaProperties.put("zookeeper.connection.timeout.ms", "30000");
 
 		final int numTries = 5;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b8f40251/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index 5dd870f..7b4c9b2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.util.NetUtils;
@@ -122,6 +123,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends
Test
 			jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "6 s");
 			jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 9);
 			jmConfig.setString(ConfigConstants.EXECUTION_RETRY_DELAY_KEY, "10 s");
+			jmConfig.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "100 s");
 
 			jmActorSystem = AkkaUtils.createActorSystem(jmConfig, new Some<Tuple2<String, Object>>(localAddress));
 			ActorRef jmActor = JobManager.startJobManagerActors(
@@ -376,6 +378,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends
Test
 				cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
 				cfg.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 100);
 				cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
+				cfg.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "100 s");
 
 				TaskManager.selectNetworkInterfaceAndRunTaskManager(cfg, TaskManager.class);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b8f40251/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java
index 737d39a..5d1b2c5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java
@@ -156,6 +156,7 @@ public class JobManagerCheckpointRecoveryITCase extends TestLogger {
 		Configuration config = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(ZooKeeper
 				.getConnectString(), FileStateBackendBasePath.getAbsoluteFile().toURI().toString());
 		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, Parallelism);
+		config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "100 s");
 
 		ActorSystem testSystem = null;
 		JobManagerProcess[] jobManagerProcess = new JobManagerProcess[2];
@@ -182,7 +183,8 @@ public class JobManagerCheckpointRecoveryITCase extends TestLogger {
 			leaderRetrievalService.start(leaderListener);
 
 			// The task manager
-			taskManagerSystem = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
+			taskManagerSystem = AkkaUtils.createActorSystem(
+					config, Option.apply(new Tuple2<String, Object>("localhost", 0)));
 			TaskManager.startTaskManagerComponentsAndActor(
 					config, taskManagerSystem, "localhost",
 					Option.<String>empty(), Option.<LeaderRetrievalService>empty(),
@@ -297,6 +299,7 @@ public class JobManagerCheckpointRecoveryITCase extends TestLogger {
 				fileStateBackendPath);
 
 		config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 2);
+		config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "100 s");
 
 		JobManagerProcess[] jobManagerProcess = new JobManagerProcess[2];
 		LeaderRetrievalService leaderRetrievalService = null;
@@ -321,7 +324,8 @@ public class JobManagerCheckpointRecoveryITCase extends TestLogger {
 			leaderRetrievalService.start(leaderListener);
 
 			// The task manager
-			taskManagerSystem = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
+			taskManagerSystem = AkkaUtils.createActorSystem(
+					config, Option.apply(new Tuple2<String, Object>("localhost", 0)));
 			TaskManager.startTaskManagerComponentsAndActor(
 					config, taskManagerSystem, "localhost",
 					Option.<String>empty(), Option.<LeaderRetrievalService>empty(),

http://git-wip-us.apache.org/repos/asf/flink/blob/b8f40251/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index 37e4e38..2b49c08 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -97,7 +97,7 @@ public class ProcessFailureCancelingITCase {
 			jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "5 s");
 			jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2000 s");
 			jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 10);
-			jmConfig.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "10 s");
+			jmConfig.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "100 s");
 
 			jmActorSystem = AkkaUtils.createActorSystem(jmConfig, new Some<Tuple2<String, Object>>(localAddress));
 			ActorRef jmActor = JobManager.startJobManagerActors(


Mime
View raw message