flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [2/2] flink git commit: [FLINK-1481] Fixes flakey JobManagerITCase which relied on non-deterministic behaviour.
Date Thu, 05 Feb 2015 16:56:34 GMT
[FLINK-1481] Fixes flakey JobManagerITCase which relied on non-deterministic behaviour.

This closes #365.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0ccd1fd5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0ccd1fd5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0ccd1fd5

Branch: refs/heads/master
Commit: 0ccd1fd5b90dbdacb579e329016a0819cd301d43
Parents: dacff90
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Thu Feb 5 15:48:23 2015 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Thu Feb 5 17:43:17 2015 +0100

----------------------------------------------------------------------
 .../flink/runtime/jobmanager/JobManagerITCase.scala  |  8 ++++++++
 .../org/apache/flink/runtime/jobmanager/Tasks.scala  | 15 +++++++++++++--
 2 files changed, 21 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0ccd1fd5/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index c2ceac3..4d8bea6 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -34,6 +34,7 @@ import scheduler.{NoResourceAvailableException, SlotSharingGroup}
 
 import scala.concurrent.duration._
 import scala.language.postfixOps
+import scala.util.Random
 
 @RunWith(classOf[JUnitRunner])
 class JobManagerITCase(_system: ActorSystem) extends TestKit(_system) with ImplicitSender
with
@@ -369,6 +370,9 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
       sender.setInvokableClass(classOf[SometimesExceptionSender])
       receiver.setInvokableClass(classOf[Receiver])
 
+      // set failing senders
+      SometimesExceptionSender.failingSenders = Seq.fill(10)(Random.nextInt(num_tasks)).toSet
+
       sender.setParallelism(num_tasks)
       receiver.setParallelism(num_tasks)
 
@@ -474,6 +478,10 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
       sender.setInvokableClass(classOf[SometimesInstantiationErrorSender])
       receiver.setInvokableClass(classOf[Receiver])
 
+      // set the failing sender tasks
+      SometimesInstantiationErrorSender.failingSenders =
+        Seq.fill(10)(Random.nextInt(num_tasks)).toSet
+
       sender.setParallelism(num_tasks)
       receiver.setParallelism(num_tasks)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0ccd1fd5/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
index 70f085a..57a61da 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
@@ -223,7 +223,8 @@ object Tasks {
     }
 
     override def invoke(): Unit = {
-      if(Math.random() < 0.05){
+      // this only works if the TaskManager runs in the same JVM as the test case
+      if(SometimesExceptionSender.failingSenders.contains(this.getIndexInSubtaskGroup)){
         throw new Exception("Test exception")
       }else{
         val o = new Object()
@@ -232,6 +233,10 @@ object Tasks {
     }
   }
 
+  object SometimesExceptionSender {
+    var failingSenders = Set[Int](0)
+  }
+
   class ExceptionReceiver extends AbstractInvokable {
     override def registerInputOutput(): Unit = {
       new RecordReader[IntegerRecord](getEnvironment.getReader(0), classOf[IntegerRecord])
@@ -253,7 +258,9 @@ object Tasks {
   }
 
   class SometimesInstantiationErrorSender extends AbstractInvokable{
-    if(Math.random < 0.05){
+
+    // this only works if the TaskManager runs in the same JVM as the test case
+    if(SometimesInstantiationErrorSender.failingSenders.contains(this.getIndexInSubtaskGroup)){
       throw new RuntimeException("Test exception in constructor")
     }
 
@@ -267,6 +274,10 @@ object Tasks {
     }
   }
 
+  object SometimesInstantiationErrorSender {
+    var failingSenders = Set[Int](0)
+  }
+
   class BlockingReceiver extends AbstractInvokable {
     override def registerInputOutput(): Unit = {
       new RecordReader[IntegerRecord](getEnvironment.getReader(0), classOf[IntegerRecord])


Mime
View raw message