flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [4/9] flink git commit: [FLINK-1584] [tests] Move logic for failure detection test cluster into test classes
Date Thu, 19 Feb 2015 19:53:28 GMT
[FLINK-1584] [tests] Move logic for failure detection test cluster into test classes


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dbf589ad
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dbf589ad
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dbf589ad

Branch: refs/heads/master
Commit: dbf589ad3b1f2061eb83112f184fd43d14908d3b
Parents: 52b40de
Author: Stephan Ewen <sewen@apache.org>
Authored: Thu Feb 19 18:50:16 2015 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Thu Feb 19 18:54:14 2015 +0100

----------------------------------------------------------------------
 .../test/util/ForkableFlinkMiniCluster.scala    | 20 +++-----------------
 .../jobmanager/JobManagerFailsITCase.scala      | 10 +++++++++-
 .../taskmanager/TaskManagerFailsITCase.scala    | 17 ++++++++++++++---
 3 files changed, 26 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dbf589ad/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
index 38095b9..fdb13b2 100644
--- a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
+++ b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
@@ -115,27 +115,13 @@ object ForkableFlinkMiniCluster {
 
   import org.apache.flink.runtime.testingUtils.TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT
 
-  def startClusterDeathWatch(numSlots: Int, numTaskManagers: Int,
-                                    timeout: String = DEFAULT_AKKA_ASK_TIMEOUT):
-  ForkableFlinkMiniCluster = {
+  def startCluster(numSlots: Int,
+                   numTaskManagers: Int,
+                   timeout: String = DEFAULT_AKKA_ASK_TIMEOUT): ForkableFlinkMiniCluster
= {
     val config = new Configuration()
     config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
     config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManagers)
     config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, timeout)
-    config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms")
-    config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "4000 ms")
-    config.setDouble(ConfigConstants.AKKA_WATCH_THRESHOLD, 5)
-
-    new ForkableFlinkMiniCluster(config, singleActorSystem = false)
-  }
-
-  def startCluster(numSlots: Int, numTaskManagers: Int, timeout: String = DEFAULT_AKKA_ASK_TIMEOUT):
-  ForkableFlinkMiniCluster = {
-    val config = new Configuration()
-    config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
-    config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManagers)
-    config.setInteger(ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY, 1000)
-    config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, timeout)
     new ForkableFlinkMiniCluster(config)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dbf589ad/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
index 1e97f7b..66b2438 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
@@ -20,6 +20,7 @@ package org.apache.flink.api.scala.runtime.jobmanager
 
 import akka.actor.{ActorSystem, PoisonPill}
 import akka.testkit.{ImplicitSender, TestKit}
+import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.messages.JobManagerMessages.RequestNumberRegisteredTaskManager
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.{JobManagerTerminated,
NotifyWhenJobManagerTerminated}
@@ -42,7 +43,14 @@ with WordSpecLike with Matchers with BeforeAndAfterAll {
     "detect a lost connection to the JobManager and try to reconnect to it" in {
       val num_slots = 11
 
-      val cluster = ForkableFlinkMiniCluster.startClusterDeathWatch(num_slots, 1)
+      val config = new Configuration()
+      config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, num_slots)
+      config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1)
+      config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms")
+      config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "4000 ms")
+      config.setDouble(ConfigConstants.AKKA_WATCH_THRESHOLD, 5)
+
+      val cluster = new ForkableFlinkMiniCluster(config, singleActorSystem = false)
 
       val tm = cluster.getTaskManagers(0)
       val jm = cluster.getJobManager

http://git-wip-us.apache.org/repos/asf/flink/blob/dbf589ad/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
index 5ae464f..a6ee119 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
@@ -20,6 +20,9 @@ package org.apache.flink.api.scala.runtime.taskmanager
 
 import akka.actor.{ActorSystem, Kill, PoisonPill}
 import akka.testkit.{ImplicitSender, TestKit}
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.configuration.ConfigConstants
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.jobgraph.{AbstractJobVertex, DistributionPattern, JobGraph}
 import org.apache.flink.runtime.jobmanager.Tasks.{BlockingReceiver, Sender}
@@ -27,6 +30,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultFailed,
Re
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
 import org.apache.flink.runtime.testingUtils.TestingUtils
 import org.apache.flink.test.util.ForkableFlinkMiniCluster
+
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
@@ -46,7 +50,14 @@ with WordSpecLike with Matchers with BeforeAndAfterAll {
     "detect a failing task manager" in {
       val num_slots = 11
 
-      val cluster = ForkableFlinkMiniCluster.startClusterDeathWatch(num_slots, 2)
+      val config = new Configuration()
+      config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, num_slots)
+      config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 2)
+      config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms")
+      config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "4000 ms")
+      config.setDouble(ConfigConstants.AKKA_WATCH_THRESHOLD, 5)
+
+      val cluster = new ForkableFlinkMiniCluster(config, singleActorSystem = false)
 
       val taskManagers = cluster.getTaskManagers
       val jm = cluster.getJobManager
@@ -65,7 +76,8 @@ with WordSpecLike with Matchers with BeforeAndAfterAll {
           jm ! RequestNumberRegisteredTaskManager
           expectMsg(1)
         }
-      }finally{
+      }
+      finally {
         cluster.stop()
       }
 
@@ -144,5 +156,4 @@ with WordSpecLike with Matchers with BeforeAndAfterAll {
       }
     }
   }
-
 }


Mime
View raw message