flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [61/82] [abbrv] incubator-flink git commit: Fixed JobManagerITCase to properly wait for task managers to deregister their tasks. Replaced the scheduler's execution service with akka's futures. Introduced TestStreamEnvironment to use ForkableFlinkMiniClus
Date Thu, 18 Dec 2014 18:45:57 GMT
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index 6689f93..0e28ab6 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -25,9 +25,8 @@ import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph,
 AbstractJobVertex}
 import Tasks._
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException
-import org.apache.flink.runtime.testingUtils.{TestingUtils, TestingJobManagerMessages}
-import TestingJobManagerMessages.{ExecutionGraphNotFound, ExecutionGraphFound,
-ResponseExecutionGraph, RequestExecutionGraph}
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved
+import org.apache.flink.runtime.testingUtils.{TestingUtils}
 import org.apache.flink.runtime.messages.JobManagerMessages._
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
@@ -68,461 +67,387 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
           expectNoMsg()
         }
 
-        val executionGraph = AkkaUtils.ask[ResponseExecutionGraph](jm,
-          RequestExecutionGraph(jobGraph.getJobID)) match {
-          case ExecutionGraphFound(_, executionGraph) => executionGraph
-          case ExecutionGraphNotFound(jobID) => fail(s"The execution graph for job ID ${jobID} " +
-            s"was not retrievable.")
-        }
-
-        executionGraph.getRegisteredExecutions.size should equal(0)
+        jm ! NotifyWhenJobRemoved(jobGraph.getJobID)
+        expectMsg(true)
       } finally {
         cluster.stop()
       }
     }
 
-        "support immediate scheduling of a single vertex" in {
-          val num_tasks = 133
-          val vertex = new AbstractJobVertex("Test Vertex")
-          vertex.setParallelism(num_tasks)
-          vertex.setInvokableClass(classOf[NoOpInvokable])
-
-          val jobGraph = new JobGraph("Test Job", vertex)
-
-          val cluster = TestingUtils.startTestingCluster(num_tasks)
-          val jm = cluster.getJobManager
+    "support immediate scheduling of a single vertex" in {
+      val num_tasks = 133
+      val vertex = new AbstractJobVertex("Test Vertex")
+      vertex.setParallelism(num_tasks)
+      vertex.setInvokableClass(classOf[NoOpInvokable])
 
-          try {
-            val availableSlots = AkkaUtils.ask[Int](jm, RequestTotalNumberOfSlots)
-            availableSlots should equal(num_tasks)
+      val jobGraph = new JobGraph("Test Job", vertex)
 
-            within(TestingUtils.TESTING_DURATION) {
-              jm ! SubmitJob(jobGraph)
+      val cluster = TestingUtils.startTestingCluster(num_tasks)
+      val jm = cluster.getJobManager
 
-              expectMsg(SubmissionSuccess(jobGraph.getJobID))
-              val result = expectMsgType[JobResultSuccess]
+      try {
+        val availableSlots = AkkaUtils.ask[Int](jm, RequestTotalNumberOfSlots)
+        availableSlots should equal(num_tasks)
 
-              result.jobID should equal(jobGraph.getJobID)
-            }
+        within(TestingUtils.TESTING_DURATION) {
+          jm ! SubmitJob(jobGraph)
 
-            val executionGraph = AkkaUtils.ask[ResponseExecutionGraph](jm,
-              RequestExecutionGraph(jobGraph.getJobID)) match {
-              case ExecutionGraphFound(_, eg) => eg
-              case ExecutionGraphNotFound(jobID) =>
-                fail(s"The execution graph for job ID ${jobID} was not retrievable.")
-            }
+          expectMsg(SubmissionSuccess(jobGraph.getJobID))
+          val result = expectMsgType[JobResultSuccess]
 
-            executionGraph.getRegisteredExecutions.size should equal(0)
-          } finally {
-            cluster.stop()
-          }
+          result.jobID should equal(jobGraph.getJobID)
         }
 
-        "support queued scheduling of a single vertex" in {
-          val num_tasks = 111
-
-          val vertex = new AbstractJobVertex("Test Vertex")
-          vertex.setParallelism(num_tasks)
-          vertex.setInvokableClass(classOf[NoOpInvokable])
+        jm ! NotifyWhenJobRemoved(jobGraph.getJobID)
+        expectMsg(true)
+      } finally {
+        cluster.stop()
+      }
+    }
 
-          val jobGraph = new JobGraph("Test job", vertex)
-          jobGraph.setAllowQueuedScheduling(true)
+    "support queued scheduling of a single vertex" in {
+      val num_tasks = 111
 
-          val cluster = TestingUtils.startTestingCluster(10)
-          val jm = cluster.getJobManager
+      val vertex = new AbstractJobVertex("Test Vertex")
+      vertex.setParallelism(num_tasks)
+      vertex.setInvokableClass(classOf[NoOpInvokable])
 
-          try {
-            within(TestingUtils.TESTING_DURATION) {
-              jm ! SubmitJob(jobGraph)
+      val jobGraph = new JobGraph("Test job", vertex)
+      jobGraph.setAllowQueuedScheduling(true)
 
-              expectMsg(SubmissionSuccess(jobGraph.getJobID))
+      val cluster = TestingUtils.startTestingCluster(10)
+      val jm = cluster.getJobManager
 
-              val result = expectMsgType[JobResultSuccess]
+      try {
+        within(TestingUtils.TESTING_DURATION) {
+          jm ! SubmitJob(jobGraph)
 
-              result.jobID should equal(jobGraph.getJobID)
-            }
+          expectMsg(SubmissionSuccess(jobGraph.getJobID))
 
-            val executionGraph = AkkaUtils.ask[ResponseExecutionGraph](jm,
-              RequestExecutionGraph(jobGraph.getJobID)) match {
-              case ExecutionGraphFound(_, eg) => eg
-              case ExecutionGraphNotFound(jobID) =>
-                fail(s"The execution graph for job ID ${jobID} was not retrievable.")
-            }
+          val result = expectMsgType[JobResultSuccess]
 
-            executionGraph.getRegisteredExecutions.size should equal(0)
-          } finally {
-            cluster.stop()
-          }
+          result.jobID should equal(jobGraph.getJobID)
         }
+        jm ! NotifyWhenJobRemoved(jobGraph.getJobID)
+        expectMsg(true)
+      } finally {
+        cluster.stop()
+      }
+    }
 
-        "support forward jobs" in {
-          val num_tasks = 31
-          val sender = new AbstractJobVertex("Sender")
-          val receiver = new AbstractJobVertex("Receiver")
-
-          sender.setInvokableClass(classOf[Sender])
-          receiver.setInvokableClass(classOf[Receiver])
-
-          sender.setParallelism(num_tasks)
-          receiver.setParallelism(num_tasks)
+    "support forward jobs" in {
+      val num_tasks = 31
+      val sender = new AbstractJobVertex("Sender")
+      val receiver = new AbstractJobVertex("Receiver")
 
-          receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+      sender.setInvokableClass(classOf[Sender])
+      receiver.setInvokableClass(classOf[Receiver])
 
-          val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
+      sender.setParallelism(num_tasks)
+      receiver.setParallelism(num_tasks)
 
-          val cluster = TestingUtils.startTestingCluster(2 * num_tasks)
-          val jm = cluster.getJobManager
+      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
 
-          try {
-            within(TestingUtils.TESTING_DURATION) {
-              jm ! SubmitJob(jobGraph)
+      val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
 
-              expectMsg(SubmissionSuccess(jobGraph.getJobID))
+      val cluster = TestingUtils.startTestingCluster(2 * num_tasks)
+      val jm = cluster.getJobManager
 
-              val result = expectMsgType[JobResultSuccess]
+      try {
+        within(TestingUtils.TESTING_DURATION) {
+          jm ! SubmitJob(jobGraph)
 
-              result.jobID should equal(jobGraph.getJobID)
-            }
+          expectMsg(SubmissionSuccess(jobGraph.getJobID))
 
-            val executionGraph = AkkaUtils.ask[ResponseExecutionGraph](jm,
-              RequestExecutionGraph(jobGraph.getJobID)) match {
-              case ExecutionGraphFound(_, eg) => eg
-              case ExecutionGraphNotFound(jobID) =>
-                fail(s"The execution graph for job ID ${jobID} was not retrievable.")
-            }
+          val result = expectMsgType[JobResultSuccess]
 
-            executionGraph.getRegisteredExecutions.size should equal(0)
-          } finally {
-            cluster.stop()
-          }
+          result.jobID should equal(jobGraph.getJobID)
         }
+        jm ! NotifyWhenJobRemoved(jobGraph.getJobID)
+        expectMsg(true)
+      } finally {
+        cluster.stop()
+      }
+    }
 
-        "support bipartite job" in {
-          val num_tasks = 31
-          val sender = new AbstractJobVertex("Sender")
-          val receiver = new AbstractJobVertex("Receiver")
-
-          sender.setInvokableClass(classOf[Sender])
-          receiver.setInvokableClass(classOf[AgnosticReceiver])
-
-          receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+    "support bipartite job" in {
+      val num_tasks = 31
+      val sender = new AbstractJobVertex("Sender")
+      val receiver = new AbstractJobVertex("Receiver")
 
-          val jobGraph = new JobGraph("Bipartite Job", sender, receiver)
+      sender.setInvokableClass(classOf[Sender])
+      receiver.setInvokableClass(classOf[AgnosticReceiver])
 
-          val cluster = TestingUtils.startTestingCluster(2 * num_tasks)
-          val jm = cluster.getJobManager
+      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
 
-          try {
-            within(TestingUtils.TESTING_DURATION) {
-              jm ! SubmitJob(jobGraph)
+      val jobGraph = new JobGraph("Bipartite Job", sender, receiver)
 
-              expectMsg(SubmissionSuccess(jobGraph.getJobID))
-              expectMsgType[JobResultSuccess]
-            }
+      val cluster = TestingUtils.startTestingCluster(2 * num_tasks)
+      val jm = cluster.getJobManager
 
-            val executionGraph = AkkaUtils.ask[ResponseExecutionGraph](jm,
-              RequestExecutionGraph(jobGraph.getJobID)) match {
-              case ExecutionGraphFound(_, eg) => eg
-              case ExecutionGraphNotFound(jobID) =>
-                fail(s"The execution graph for job ID ${jobID} was not retrievable.")
-            }
+      try {
+        within(TestingUtils.TESTING_DURATION) {
+          jm ! SubmitJob(jobGraph)
 
-            executionGraph.getRegisteredExecutions.size should equal(0)
-          } finally {
-            cluster.stop()
-          }
+          expectMsg(SubmissionSuccess(jobGraph.getJobID))
+          expectMsgType[JobResultSuccess]
         }
+        jm ! NotifyWhenJobRemoved(jobGraph.getJobID)
+        expectMsg(true)
+      } finally {
+        cluster.stop()
+      }
+    }
 
-        "support two input job failing edge mismatch" in {
-          val num_tasks = 11
-          val sender1 = new AbstractJobVertex("Sender1")
-          val sender2 = new AbstractJobVertex("Sender2")
-          val receiver = new AbstractJobVertex("Receiver")
-
-          sender1.setInvokableClass(classOf[Sender])
-          sender2.setInvokableClass(classOf[Sender])
-          receiver.setInvokableClass(classOf[AgnosticReceiver])
-
-          sender1.setParallelism(num_tasks)
-          sender2.setParallelism(2 * num_tasks)
-          receiver.setParallelism(3 * num_tasks)
+    "support two input job failing edge mismatch" in {
+      val num_tasks = 1
+      val sender1 = new AbstractJobVertex("Sender1")
+      val sender2 = new AbstractJobVertex("Sender2")
+      val receiver = new AbstractJobVertex("Receiver")
 
-          receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE)
-          receiver.connectNewDataSetAsInput(sender2, DistributionPattern.BIPARTITE)
+      sender1.setInvokableClass(classOf[Sender])
+      sender2.setInvokableClass(classOf[Sender])
+      receiver.setInvokableClass(classOf[AgnosticReceiver])
 
-          val jobGraph = new JobGraph("Bipartite Job", sender1, receiver, sender2)
+      sender1.setParallelism(num_tasks)
+      sender2.setParallelism(2 * num_tasks)
+      receiver.setParallelism(3 * num_tasks)
 
-          val cluster = TestingUtils.startTestingCluster(6 * num_tasks)
-          val jm = cluster.getJobManager
+      receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE)
+      receiver.connectNewDataSetAsInput(sender2, DistributionPattern.BIPARTITE)
 
-          try {
-            within(TestingUtils.TESTING_DURATION) {
-              jm ! SubmitJob(jobGraph)
+      val jobGraph = new JobGraph("Bipartite Job", sender1, receiver, sender2)
 
-              expectMsg(SubmissionSuccess(jobGraph.getJobID))
-              expectMsgType[JobResultFailed]
-            }
+      val cluster = TestingUtils.startTestingCluster(6 * num_tasks)
+      val jm = cluster.getJobManager
 
-            val executionGraph = AkkaUtils.ask[ResponseExecutionGraph](jm,
-              RequestExecutionGraph(jobGraph.getJobID)) match {
-              case ExecutionGraphFound(_, eg) => eg
-              case ExecutionGraphNotFound(jobID) =>
-                fail(s"The execution graph for job ID ${jobID} was not retrievable.")
-            }
+      try {
+        within(TestingUtils.TESTING_DURATION) {
+          jm ! SubmitJob(jobGraph)
 
-            executionGraph.getRegisteredExecutions.size should equal(0)
-          } finally {
-            cluster.stop()
-          }
+          expectMsg(SubmissionSuccess(jobGraph.getJobID))
+          expectMsgType[JobResultFailed]
         }
 
-        "support two input job" in {
-          val num_tasks = 11
-          val sender1 = new AbstractJobVertex("Sender1")
-          val sender2 = new AbstractJobVertex("Sender2")
-          val receiver = new AbstractJobVertex("Receiver")
-
-          sender1.setInvokableClass(classOf[Sender])
-          sender2.setInvokableClass(classOf[Sender])
-          receiver.setInvokableClass(classOf[AgnosticBinaryReceiver])
+        jm ! NotifyWhenJobRemoved(jobGraph.getJobID)
+        expectMsg(true)
+      } finally {
+        cluster.stop()
+      }
+    }
 
-          sender1.setParallelism(num_tasks)
-          sender2.setParallelism(2 * num_tasks)
-          receiver.setParallelism(3 * num_tasks)
+    "support two input job" in {
+      val num_tasks = 11
+      val sender1 = new AbstractJobVertex("Sender1")
+      val sender2 = new AbstractJobVertex("Sender2")
+      val receiver = new AbstractJobVertex("Receiver")
 
-          receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE)
-          receiver.connectNewDataSetAsInput(sender2, DistributionPattern.BIPARTITE)
+      sender1.setInvokableClass(classOf[Sender])
+      sender2.setInvokableClass(classOf[Sender])
+      receiver.setInvokableClass(classOf[AgnosticBinaryReceiver])
 
-          val jobGraph = new JobGraph("Bipartite Job", sender1, receiver, sender2)
+      sender1.setParallelism(num_tasks)
+      sender2.setParallelism(2 * num_tasks)
+      receiver.setParallelism(3 * num_tasks)
 
-          val cluster = TestingUtils.startTestingCluster(6 * num_tasks)
-          val jm = cluster.getJobManager
+      receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE)
+      receiver.connectNewDataSetAsInput(sender2, DistributionPattern.BIPARTITE)
 
-          try {
-            within(TestingUtils.TESTING_DURATION) {
-              jm ! SubmitJob(jobGraph)
-              expectMsg(SubmissionSuccess(jobGraph.getJobID))
+      val jobGraph = new JobGraph("Bipartite Job", sender1, receiver, sender2)
 
-              expectMsgType[JobResultSuccess]
-            }
+      val cluster = TestingUtils.startTestingCluster(6 * num_tasks)
+      val jm = cluster.getJobManager
 
-            val executionGraph = AkkaUtils.ask[ResponseExecutionGraph](jm,
-              RequestExecutionGraph(jobGraph.getJobID)) match {
-              case ExecutionGraphFound(_, eg) => eg
-              case ExecutionGraphNotFound(jobID) =>
-                fail(s"The execution graph for job ID ${jobID} was not retrievable.")
-            }
+      try {
+        within(TestingUtils.TESTING_DURATION) {
+          jm ! SubmitJob(jobGraph)
+          expectMsg(SubmissionSuccess(jobGraph.getJobID))
 
-            executionGraph.getRegisteredExecutions.size should equal(0)
-          } finally {
-            cluster.stop()
-          }
+          expectMsgType[JobResultSuccess]
         }
 
-        "handle job with a failing sender vertex" in {
-          val num_tasks = 100
-          val sender = new AbstractJobVertex("Sender")
-          val receiver = new AbstractJobVertex("Receiver")
-
-          sender.setInvokableClass(classOf[ExceptionSender])
-          receiver.setInvokableClass(classOf[Receiver])
-
-          sender.setParallelism(num_tasks)
-          receiver.setParallelism(num_tasks)
+        jm ! NotifyWhenJobRemoved(jobGraph.getJobID)
+        expectMsg(true)
+      } finally {
+        cluster.stop()
+      }
+    }
 
-          receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+    "handle job with a failing sender vertex" in {
+      val num_tasks = 100
+      val sender = new AbstractJobVertex("Sender")
+      val receiver = new AbstractJobVertex("Receiver")
 
-          val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
+      sender.setInvokableClass(classOf[ExceptionSender])
+      receiver.setInvokableClass(classOf[Receiver])
 
-          val cluster = TestingUtils.startTestingCluster(num_tasks)
-          val jm = cluster.getJobManager
+      sender.setParallelism(num_tasks)
+      receiver.setParallelism(num_tasks)
 
-          try {
-            within(TestingUtils.TESTING_DURATION) {
-              jm ! RequestTotalNumberOfSlots
-              expectMsg(num_tasks)
-            }
+      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
 
-            within(TestingUtils.TESTING_DURATION) {
-              jm ! SubmitJob(jobGraph)
-              expectMsg(SubmissionSuccess(jobGraph.getJobID))
-              expectMsgType[JobResultFailed]
-            }
+      val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
 
-            val executionGraph = AkkaUtils.ask[ResponseExecutionGraph](jm,
-              RequestExecutionGraph(jobGraph.getJobID)) match {
-              case ExecutionGraphFound(_, eg) => eg
-              case ExecutionGraphNotFound(jobID) =>
-                fail(s"The execution graph for job ID ${jobID} was not retrievable.")
-            }
+      val cluster = TestingUtils.startTestingCluster(num_tasks)
+      val jm = cluster.getJobManager
 
-            executionGraph.getRegisteredExecutions.size should equal(0)
-          } finally {
-            cluster.stop()
-          }
+      try {
+        within(TestingUtils.TESTING_DURATION) {
+          jm ! RequestTotalNumberOfSlots
+          expectMsg(num_tasks)
         }
 
-        "handle job with an occasionally failing sender vertex" in {
-          val num_tasks = 100
-          val sender = new AbstractJobVertex("Sender")
-          val receiver = new AbstractJobVertex("Receiver")
-
-          sender.setInvokableClass(classOf[SometimesExceptionSender])
-          receiver.setInvokableClass(classOf[Receiver])
+        within(TestingUtils.TESTING_DURATION) {
+          jm ! SubmitJob(jobGraph)
+          expectMsg(SubmissionSuccess(jobGraph.getJobID))
+          expectMsgType[JobResultFailed]
+        }
 
-          sender.setParallelism(num_tasks)
-          receiver.setParallelism(num_tasks)
+        jm ! NotifyWhenJobRemoved(jobGraph.getJobID)
+        expectMsg(true)
+      } finally {
+        cluster.stop()
+      }
+    }
 
-          receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+    "handle job with an occasionally failing sender vertex" in {
+      val num_tasks = 100
+      val sender = new AbstractJobVertex("Sender")
+      val receiver = new AbstractJobVertex("Receiver")
 
-          val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
+      sender.setInvokableClass(classOf[SometimesExceptionSender])
+      receiver.setInvokableClass(classOf[Receiver])
 
-          val cluster = TestingUtils.startTestingCluster(num_tasks)
-          val jm = cluster.getJobManager
+      sender.setParallelism(num_tasks)
+      receiver.setParallelism(num_tasks)
 
-          try {
-            within(TestingUtils.TESTING_DURATION) {
-              jm ! RequestTotalNumberOfSlots
-              expectMsg(num_tasks)
-            }
+      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
 
-            within(TestingUtils.TESTING_DURATION) {
-              jm ! SubmitJob(jobGraph)
-              expectMsg(SubmissionSuccess(jobGraph.getJobID))
-              expectMsgType[JobResultFailed]
-            }
+      val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
 
-            val executionGraph = AkkaUtils.ask[ResponseExecutionGraph](jm,
-              RequestExecutionGraph(jobGraph.getJobID)) match {
-              case ExecutionGraphFound(_, eg) => eg
-              case ExecutionGraphNotFound(jobID) =>
-                fail(s"The execution graph for job ID ${jobID} was not retrievable.")
-            }
+      val cluster = TestingUtils.startTestingCluster(num_tasks)
+      val jm = cluster.getJobManager
 
-            executionGraph.getRegisteredExecutions.size should equal(0)
-          } finally {
-            cluster.stop()
-          }
+      try {
+        within(TestingUtils.TESTING_DURATION) {
+          jm ! RequestTotalNumberOfSlots
+          expectMsg(num_tasks)
         }
 
-        "handle job with a failing receiver vertex" in {
-          val num_tasks = 200
-          val sender = new AbstractJobVertex("Sender")
-          val receiver = new AbstractJobVertex("Receiver")
+        within(TestingUtils.TESTING_DURATION) {
+          jm ! SubmitJob(jobGraph)
+          expectMsg(SubmissionSuccess(jobGraph.getJobID))
+          expectMsgType[JobResultFailed]
+        }
 
-          sender.setInvokableClass(classOf[Sender])
-          receiver.setInvokableClass(classOf[ExceptionReceiver])
+        jm ! NotifyWhenJobRemoved(jobGraph.getJobID)
+        expectMsg(true)
+      } finally {
+        cluster.stop()
+      }
+    }
 
-          sender.setParallelism(num_tasks)
-          receiver.setParallelism(num_tasks)
+    "handle job with a failing receiver vertex" in {
+      val num_tasks = 200
+      val sender = new AbstractJobVertex("Sender")
+      val receiver = new AbstractJobVertex("Receiver")
 
-          receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+      sender.setInvokableClass(classOf[Sender])
+      receiver.setInvokableClass(classOf[ExceptionReceiver])
 
-          val jobGraph = new JobGraph("Pointwise job", sender, receiver)
+      sender.setParallelism(num_tasks)
+      receiver.setParallelism(num_tasks)
 
-          val cluster = TestingUtils.startTestingCluster(2 * num_tasks)
-          val jm = cluster.getJobManager
+      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
 
-          try {
-            within(TestingUtils.TESTING_DURATION) {
-              jm ! SubmitJob(jobGraph)
-              expectMsg(SubmissionSuccess(jobGraph.getJobID))
-              expectMsgType[JobResultFailed]
-            }
+      val jobGraph = new JobGraph("Pointwise job", sender, receiver)
 
-            val executionGraph = AkkaUtils.ask[ResponseExecutionGraph](jm,
-              RequestExecutionGraph(jobGraph.getJobID)) match {
-              case ExecutionGraphFound(_, eg) => eg
-              case ExecutionGraphNotFound(jobID) =>
-                fail(s"The execution graph for job ID ${jobID} was not retrievable.")
-            }
+      val cluster = TestingUtils.startTestingCluster(2 * num_tasks)
+      val jm = cluster.getJobManager
 
-            executionGraph.getRegisteredExecutions.size should equal(0)
-          } finally {
-            cluster.stop()
-          }
+      try {
+        within(TestingUtils.TESTING_DURATION) {
+          jm ! SubmitJob(jobGraph)
+          expectMsg(SubmissionSuccess(jobGraph.getJobID))
+          expectMsgType[JobResultFailed]
         }
 
-        "handle job with all vertices failing during instantiation" in {
-          val num_tasks = 200
-          val sender = new AbstractJobVertex("Sender")
-          val receiver = new AbstractJobVertex("Receiver")
-
-          sender.setInvokableClass(classOf[InstantiationErrorSender])
-          receiver.setInvokableClass(classOf[Receiver])
+        jm ! NotifyWhenJobRemoved(jobGraph.getJobID)
+        expectMsg(true)
+      } finally {
+        cluster.stop()
+      }
+    }
 
-          sender.setParallelism(num_tasks)
-          receiver.setParallelism(num_tasks)
+    "handle job with all vertices failing during instantiation" in {
+      val num_tasks = 200
+      val sender = new AbstractJobVertex("Sender")
+      val receiver = new AbstractJobVertex("Receiver")
 
-          receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+      sender.setInvokableClass(classOf[InstantiationErrorSender])
+      receiver.setInvokableClass(classOf[Receiver])
 
-          val jobGraph = new JobGraph("Pointwise job", sender, receiver)
+      sender.setParallelism(num_tasks)
+      receiver.setParallelism(num_tasks)
 
-          val cluster = TestingUtils.startTestingCluster(num_tasks)
-          val jm = cluster.getJobManager
+      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
 
-          try {
-            within(TestingUtils.TESTING_DURATION) {
-              jm ! RequestTotalNumberOfSlots
-              expectMsg(num_tasks)
+      val jobGraph = new JobGraph("Pointwise job", sender, receiver)
 
-              jm ! SubmitJob(jobGraph)
-              expectMsg(SubmissionSuccess(jobGraph.getJobID))
-              expectMsgType[JobResultFailed]
-            }
+      val cluster = TestingUtils.startTestingCluster(num_tasks)
+      val jm = cluster.getJobManager
 
-            val executionGraph = AkkaUtils.ask[ResponseExecutionGraph](jm,
-              RequestExecutionGraph(jobGraph.getJobID)) match {
-              case ExecutionGraphFound(_, eg) => eg
-              case ExecutionGraphNotFound(jobID) =>
-                fail(s"The execution graph for job ID ${jobID} was not retrievable.")
-            }
+      try {
+        within(TestingUtils.TESTING_DURATION) {
+          jm ! RequestTotalNumberOfSlots
+          expectMsg(num_tasks)
 
-            executionGraph.getRegisteredExecutions.size should equal(0)
-          } finally {
-            cluster.stop()
-          }
+          jm ! SubmitJob(jobGraph)
+          expectMsg(SubmissionSuccess(jobGraph.getJobID))
+          expectMsgType[JobResultFailed]
         }
 
-        "handle job with some vertices failing during instantiation" in {
-          val num_tasks = 200
-          val sender = new AbstractJobVertex("Sender")
-          val receiver = new AbstractJobVertex("Receiver")
+        jm ! NotifyWhenJobRemoved(jobGraph.getJobID)
 
-          sender.setInvokableClass(classOf[SometimesInstantiationErrorSender])
-          receiver.setInvokableClass(classOf[Receiver])
+        expectMsg(true)
+      } finally {
+        cluster.stop()
+      }
+    }
 
-          sender.setParallelism(num_tasks)
-          receiver.setParallelism(num_tasks)
+    "handle job with some vertices failing during instantiation" in {
+      val num_tasks = 200
+      val sender = new AbstractJobVertex("Sender")
+      val receiver = new AbstractJobVertex("Receiver")
 
-          receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+      sender.setInvokableClass(classOf[SometimesInstantiationErrorSender])
+      receiver.setInvokableClass(classOf[Receiver])
 
-          val jobGraph = new JobGraph("Pointwise job", sender, receiver)
+      sender.setParallelism(num_tasks)
+      receiver.setParallelism(num_tasks)
 
-          val cluster = TestingUtils.startTestingCluster(num_tasks)
-          val jm = cluster.getJobManager
+      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
 
-          try {
-            within(TestingUtils.TESTING_DURATION) {
-              jm ! RequestTotalNumberOfSlots
-              expectMsg(num_tasks)
+      val jobGraph = new JobGraph("Pointwise job", sender, receiver)
 
-              jm ! SubmitJob(jobGraph)
-              expectMsg(SubmissionSuccess(jobGraph.getJobID))
-              expectMsgType[JobResultFailed]
-            }
+      val cluster = TestingUtils.startTestingCluster(num_tasks)
+      val jm = cluster.getJobManager
 
-            val executionGraph = AkkaUtils.ask[ResponseExecutionGraph](jm,
-              RequestExecutionGraph(jobGraph.getJobID)) match {
-              case ExecutionGraphFound(_, eg) => eg
-              case ExecutionGraphNotFound(jobID) =>
-                fail(s"The execution graph for job ID ${jobID} was not retrievable.")
-            }
+      try {
+        within(TestingUtils.TESTING_DURATION) {
+          jm ! RequestTotalNumberOfSlots
+          expectMsg(num_tasks)
 
-            executionGraph.getRegisteredExecutions.size should equal(0)
-          } finally {
-            cluster.stop()
-          }
+          jm ! SubmitJob(jobGraph)
+          expectMsg(SubmissionSuccess(jobGraph.getJobID))
+          expectMsgType[JobResultFailed]
         }
+
+        jm ! NotifyWhenJobRemoved(jobGraph.getJobID)
+        expectMsg(true)
+      } finally {
+        cluster.stop()
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index 9961ada..5a51265 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -44,7 +44,7 @@ class TestingCluster(userConfiguration: Configuration) extends FlinkMiniCluster(
 
   override def startTaskManager(index: Int)(implicit system: ActorSystem) = {
     val (connectionInfo, jobManagerURL, taskManagerConfig, networkConnectionConfig) =
-      TaskManager.parseConfiguration(FlinkMiniCluster.HOSTNAME, configuration, true)
+      TaskManager.parseConfiguration(HOSTNAME, configuration, true)
 
     system.actorOf(Props(new TaskManager(connectionInfo, jobManagerURL, taskManagerConfig,
       networkConnectionConfig) with TestingTaskManager), TaskManager.TASK_MANAGER_NAME + index)

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
index 9782b72..67a8934 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
@@ -19,15 +19,18 @@
 package org.apache.flink.runtime.testingUtils
 
 import akka.actor.{ActorRef, Props}
+import akka.pattern.{ask, pipe}
 import org.apache.flink.runtime.ActorLogMessages
 import org.apache.flink.runtime.execution.ExecutionState
 import org.apache.flink.runtime.jobgraph.JobID
 import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist}
-import org.apache.flink.runtime.messages.ExecutionGraphMessages.ExecutionStateChanged
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.{AllVerticesRunning,
-WaitForAllVerticesToBeRunning, ExecutionGraphFound, RequestExecutionGraph}
+import org.apache.flink.runtime.messages.ExecutionGraphMessages.{JobStatusChanged,
+ExecutionStateChanged}
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.NotifyWhenTaskRemoved
 
 import scala.collection.convert.WrapAsScala
+import scala.concurrent.{Await, Future}
 
 
 trait TestingJobManager extends ActorLogMessages with WrapAsScala {
@@ -72,6 +75,22 @@ trait TestingJobManager extends ActorLogMessages with WrapAsScala {
       if(cleanup){
         waitForAllVerticesToBeRunning.remove(jobID)
       }
+    case NotifyWhenJobRemoved(jobID) => {
+      val tms = instanceManager.getAllRegisteredInstances.map(_.getTaskManager)
+
+      val responses = tms.map{
+        tm =>
+          (tm ? NotifyWhenJobRemoved(jobID))(timeout).mapTo[Boolean]
+      }
+
+      import context.dispatcher
+      val f = Future.sequence(responses)
+
+      val t = Await.result(f, timeout)
+
+      sender() ! true
+//      Future.fold(responses)(true)(_ & _) pipeTo sender()
+    }
 
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
index 3b34955..7941226 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
@@ -37,4 +37,5 @@ object TestingJobManagerMessages {
   case class WaitForAllVerticesToBeRunning(jobID: JobID)
   case class AllVerticesRunning(jobID: JobID)
 
+  case class NotifyWhenJobRemoved(jobID: JobID)
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
index 5c6cca1..31a43cb 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
@@ -19,16 +19,20 @@
 package org.apache.flink.runtime.testingUtils
 
 import akka.actor.ActorRef
+import org.apache.flink.runtime.jobgraph.JobID
 import org.apache.flink.runtime.taskmanager.TaskManager
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved
 import org.apache.flink.runtime.{ActorLogMessages}
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages._
 import org.apache.flink.runtime.messages.TaskManagerMessages.UnregisterTask
+import scala.concurrent.duration._
 
 trait TestingTaskManager extends ActorLogMessages {
-  self: TaskManager =>
+  that: TaskManager =>
 
   val waitForRemoval = scala.collection.mutable.HashMap[ExecutionAttemptID, Set[ActorRef]]()
+  val waitForJobRemoval = scala.collection.mutable.HashMap[JobID, Set[ActorRef]]()
 
   abstract override def receiveWithLogMessages = {
     receiveTestMessages orElse super.receiveWithLogMessages
@@ -51,7 +55,32 @@ trait TestingTaskManager extends ActorLogMessages {
         case None =>
       }
     case RequestBroadcastVariablesWithReferences => {
-      sender() ! ResponseBroadcastVariablesWithReferences(bcVarManager.getNumberOfVariablesWithReferences)
+      sender() ! ResponseBroadcastVariablesWithReferences(
+        bcVarManager.getNumberOfVariablesWithReferences)
+    }
+    case NotifyWhenJobRemoved(jobID) => {
+      if(runningTasks.values.exists(_.getJobID == jobID)){
+        val set = waitForJobRemoval.getOrElse(jobID, Set())
+        waitForJobRemoval += (jobID -> (set + sender()))
+        import context.dispatcher
+        context.system.scheduler.scheduleOnce(200 milliseconds, this.self, CheckIfJobRemoved(jobID))
+      }else{
+        waitForJobRemoval.get(jobID) match {
+          case Some(listeners) => (listeners + sender()) foreach (_ ! true)
+          case None => sender() ! true
+        }
+      }
+    }
+    case CheckIfJobRemoved(jobID) => {
+      if(runningTasks.values.forall(_.getJobID != jobID)){
+        waitForJobRemoval.get(jobID) match {
+          case Some(listeners) => listeners foreach (_ ! true)
+          case None =>
+        }
+      }else{
+        import context.dispatcher
+        context.system.scheduler.scheduleOnce(200 milliseconds, this.self, CheckIfJobRemoved(jobID))
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
index 24d7e5c..cb5282e 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
@@ -19,10 +19,12 @@
 package org.apache.flink.runtime.testingUtils
 
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
+import org.apache.flink.runtime.jobgraph.JobID
 import org.apache.flink.runtime.taskmanager.Task
 
 object TestingTaskManagerMessages{
   case class NotifyWhenTaskRemoved(executionID: ExecutionAttemptID)
+
   case object RequestRunningTasks
   case class ResponseRunningTasks(tasks: Map[ExecutionAttemptID, Task]){
     import collection.JavaConverters._
@@ -30,4 +32,6 @@ object TestingTaskManagerMessages{
   }
   case object RequestBroadcastVariablesWithReferences
   case class ResponseBroadcastVariablesWithReferences(number: Int)
+
+  case class CheckIfJobRemoved(jobID: JobID)
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-scala/pom.xml b/flink-scala/pom.xml
index df0915e..dddef4a 100644
--- a/flink-scala/pom.xml
+++ b/flink-scala/pom.xml
@@ -208,7 +208,7 @@ under the License.
 					</execution>
 				</executions>
 			</plugin>
-			
+
 			<plugin>
 				<groupId>org.scalastyle</groupId>
 				<artifactId>scalastyle-maven-plugin</artifactId>
@@ -232,7 +232,7 @@ under the License.
 					<outputEncoding>UTF-8</outputEncoding>
 				</configuration>
 			</plugin>
-			
+
 		</plugins>
 	</build>
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
index d28e9dd..dcab0b8 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala
index 19c2f90..ce19a65 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala
index f25dd6c..8685cc5 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
index 7e9e4e5..4f8f632 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
index 171db60..539f96c 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
index 40071b7..5468637 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
index f5ab3ba..06e40d8 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-test-utils/pom.xml
----------------------------------------------------------------------
diff --git a/flink-test-utils/pom.xml b/flink-test-utils/pom.xml
index d77318e..22dafda 100644
--- a/flink-test-utils/pom.xml
+++ b/flink-test-utils/pom.xml
@@ -80,4 +80,138 @@ under the License.
 			<scope>provided</scope>
 		</dependency>
 	</dependencies>
+
+	<build>
+		<plugins>
+			<!-- Scala Compiler -->
+			<plugin>
+				<groupId>net.alchim31.maven</groupId>
+				<artifactId>scala-maven-plugin</artifactId>
+				<version>3.1.4</version>
+				<executions>
+					<!-- Run scala compiler in the process-resources phase, so that dependencies on
+						scala classes can be resolved later in the (Java) compile phase -->
+					<execution>
+						<id>scala-compile-first</id>
+						<phase>process-resources</phase>
+						<goals>
+							<goal>compile</goal>
+						</goals>
+					</execution>
+
+					<!-- Run scala compiler in the process-test-resources phase, so that dependencies on
+						 scala classes can be resolved later in the (Java) test-compile phase -->
+					<execution>
+						<id>scala-test-compile</id>
+						<phase>process-test-resources</phase>
+						<goals>
+							<goal>testCompile</goal>
+						</goals>
+					</execution>
+				</executions>
+				<configuration>
+					<jvmArgs>
+						<jvmArg>-Xms128m</jvmArg>
+						<jvmArg>-Xmx512m</jvmArg>
+					</jvmArgs>
+					<compilerPlugins>
+						<compilerPlugin>
+							<groupId>org.scalamacros</groupId>
+							<artifactId>paradise_${scala.version}</artifactId>
+							<version>${scala.macros.version}</version>
+						</compilerPlugin>
+					</compilerPlugins>
+				</configuration>
+			</plugin>
+
+			<!-- Eclipse Integration -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-eclipse-plugin</artifactId>
+				<version>2.8</version>
+				<configuration>
+					<downloadSources>true</downloadSources>
+					<projectnatures>
+						<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
+						<projectnature>org.eclipse.jdt.core.javanature</projectnature>
+					</projectnatures>
+					<buildcommands>
+						<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
+					</buildcommands>
+					<classpathContainers>
+						<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
+						<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
+					</classpathContainers>
+					<excludes>
+						<exclude>org.scala-lang:scala-library</exclude>
+						<exclude>org.scala-lang:scala-compiler</exclude>
+					</excludes>
+					<sourceIncludes>
+						<sourceInclude>**/*.scala</sourceInclude>
+						<sourceInclude>**/*.java</sourceInclude>
+					</sourceIncludes>
+				</configuration>
+			</plugin>
+
+			<!-- Adding scala source directories to build path -->
+			<plugin>
+				<groupId>org.codehaus.mojo</groupId>
+				<artifactId>build-helper-maven-plugin</artifactId>
+				<version>1.7</version>
+				<executions>
+					<!-- Add src/main/scala to eclipse build path -->
+					<execution>
+						<id>add-source</id>
+						<phase>generate-sources</phase>
+						<goals>
+							<goal>add-source</goal>
+						</goals>
+						<configuration>
+							<sources>
+								<source>src/main/scala</source>
+							</sources>
+						</configuration>
+					</execution>
+					<!-- Add src/test/scala to eclipse build path -->
+					<execution>
+						<id>add-test-source</id>
+						<phase>generate-test-sources</phase>
+						<goals>
+							<goal>add-test-source</goal>
+						</goals>
+						<configuration>
+							<sources>
+								<source>src/test/scala</source>
+							</sources>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+
+			<plugin>
+				<groupId>org.scalastyle</groupId>
+				<artifactId>scalastyle-maven-plugin</artifactId>
+				<version>0.5.0</version>
+				<executions>
+					<execution>
+						<goals>
+							<goal>check</goal>
+						</goals>
+					</execution>
+				</executions>
+				<configuration>
+					<verbose>false</verbose>
+					<failOnViolation>true</failOnViolation>
+					<includeTestSourceDirectory>true</includeTestSourceDirectory>
+					<failOnWarning>false</failOnWarning>
+					<sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
+					<testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
+					<configLocation>${project.basedir}/../tools/maven/scalastyle-config.xml</configLocation>
+					<outputFile>${project.basedir}/scalastyle-output.xml</outputFile>
+					<outputEncoding>UTF-8</outputEncoding>
+				</configuration>
+			</plugin>
+
+		</plugins>
+	</build>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
index b73f961..c30d976 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
@@ -46,7 +46,6 @@ import org.apache.commons.io.FileUtils;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
 import org.apache.hadoop.fs.FileSystem;
 import org.junit.Assert;
@@ -68,7 +67,7 @@ public abstract class AbstractTestBase {
 
 	protected final Configuration config;
 	
-	protected TestingCluster executor;
+	protected ForkableFlinkMiniCluster executor;
 
 	private final List<File> tempFiles;
 
@@ -97,16 +96,15 @@ public abstract class AbstractTestBase {
 	// --------------------------------------------------------------------------------------------
 	
 	public void startCluster() throws Exception {
-		Thread.sleep(250);
 		Configuration config = new Configuration();
 		config.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, true);
 		config.setBoolean(ConfigConstants.TASK_MANAGER_MEMORY_LAZY_ALLOCATION_KEY, true);
 		config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, TASK_MANAGER_MEMORY_SIZE);
 		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, taskManagerNumSlots);
 		config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManagers);
-		this.executor = new TestingCluster(config);
+		this.executor = new ForkableFlinkMiniCluster(config);
 	}
-	
+
 	public void stopCluster() throws Exception {
 		try {
 			

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
index 83dd73b..03f60b3 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
@@ -31,7 +31,6 @@ import org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
 import org.junit.Assert;
 import org.junit.Test;
 import org.apache.flink.api.java.CollectionEnvironment;
@@ -194,12 +193,12 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
 	
 	private static final class TestEnvironment extends ExecutionEnvironment {
 
-		private final FlinkMiniCluster executor;
+		private final ForkableFlinkMiniCluster executor;
 
 		private JobExecutionResult latestResult;
 		
 		
-		private TestEnvironment(FlinkMiniCluster executor, int degreeOfParallelism) {
+		private TestEnvironment(ForkableFlinkMiniCluster executor, int degreeOfParallelism) {
 			this.executor = executor;
 			setDegreeOfParallelism(degreeOfParallelism);
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
new file mode 100644
index 0000000..f82a4a6
--- /dev/null
+++ b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util
+
+import akka.actor.{Props, ActorSystem, ActorRef}
+import org.apache.flink.configuration.{ConfigConstants, Configuration}
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
+import org.apache.flink.runtime.taskmanager.TaskManager
+import org.apache.flink.runtime.testingUtils.TestingTaskManager
+
+class ForkableFlinkMiniCluster(userConfiguration: Configuration) extends
+LocalFlinkMiniCluster(userConfiguration) {
+
+  override def generateConfiguration(userConfiguration: Configuration): Configuration = {
+    val forNumberString = System.getProperty("forkNumber")
+
+    val forkNumber = try {
+      Integer.parseInt(forNumberString)
+    }catch{
+      case e: NumberFormatException => -1
+    }
+
+    val config = userConfiguration.clone()
+
+    if(forkNumber != -1){
+      val jobManagerRPC = 1024 + forkNumber*300
+      val taskManagerRPC = 1024 + forkNumber*300 + 100
+      val taskManagerData = 1024 + forkNumber*300 + 200
+
+      config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerRPC)
+      config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, taskManagerRPC)
+      config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, taskManagerData)
+
+    }
+
+    super.generateConfiguration(config)
+  }
+
+  override def startTaskManager(index: Int)(implicit system: ActorSystem): ActorRef = {
+    val config = configuration.clone()
+
+    val rpcPort = config.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, ConfigConstants
+      .DEFAULT_TASK_MANAGER_IPC_PORT)
+    val dataPort = config.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, ConfigConstants
+      .DEFAULT_TASK_MANAGER_DATA_PORT)
+
+    if(rpcPort > 0){
+      config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort + index)
+    }
+
+    if(dataPort > 0){
+      config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort + index)
+    }
+
+    val (connectionInfo, jobManagerAkkaURL, taskManagerConfig, networkConnectionConfig) =
+      TaskManager.parseConfiguration(HOSTNAME, config, false)
+
+    system.actorOf(Props(new TaskManager(connectionInfo, jobManagerAkkaURL, taskManagerConfig,
+      networkConnectionConfig) with TestingTaskManager), TaskManager.TASK_MANAGER_NAME + index)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
index 6347cb5..303ee3d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.messages.JobClientMessages;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.junit.Assert;
 
 import org.slf4j.Logger;
@@ -69,7 +70,7 @@ public abstract class CancellingTestBase {
 
 	// --------------------------------------------------------------------------------------------
 	
-	protected LocalFlinkMiniCluster executor;
+	protected ForkableFlinkMiniCluster executor;
 
 	protected int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;
 	
@@ -87,7 +88,7 @@ public abstract class CancellingTestBase {
 		Configuration config = new Configuration();
 		config.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, true);
 
-		this.executor = new LocalFlinkMiniCluster(config);
+		this.executor = new ForkableFlinkMiniCluster(config);
 	}
 
 	@After

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java b/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java
index 4bd3fc7..9046d2d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java
@@ -24,8 +24,8 @@ import java.io.FileWriter;
 import org.apache.flink.client.RemoteExecutor;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.test.testdata.KMeansData;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -36,7 +36,7 @@ public class PackagedProgramEndToEndITCase {
 
 	@Test
 	public void testEverything() {
-		LocalFlinkMiniCluster cluster = null;
+		ForkableFlinkMiniCluster cluster = null;
 
 		File points = null;
 		File clusters = null;
@@ -64,7 +64,7 @@ public class PackagedProgramEndToEndITCase {
 			Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 2);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
-			cluster = new LocalFlinkMiniCluster(config);
+			cluster = new ForkableFlinkMiniCluster(config);
 
 			RemoteExecutor ex = new RemoteExecutor("localhost", cluster.getJobManagerRPCPort());
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-tests/src/test/java/org/apache/flink/test/util/FailingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/util/FailingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/util/FailingTestBase.java
index e8b716b..06013b2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/util/FailingTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/util/FailingTestBase.java
@@ -19,7 +19,6 @@
 package org.apache.flink.test.util;
 
 import akka.actor.ActorRef;
-import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
 import org.junit.Assert;
 
 import org.apache.flink.runtime.client.JobClient;
@@ -120,7 +119,7 @@ public abstract class FailingTestBase extends RecordAPITestBase {
 		// reference to the timeout thread
 		private final Thread timeoutThread;
 		// cluster to submit the job to.
-		private final FlinkMiniCluster executor;
+		private final ForkableFlinkMiniCluster executor;
 		// job graph of the failing job (submitted first)
 		private final JobGraph failingJob;
 		// job graph of the working job (submitted after return from failing job)
@@ -129,8 +128,8 @@ public abstract class FailingTestBase extends RecordAPITestBase {
 		private volatile Exception error;
 		
 
-		public SubmissionThread(Thread timeoutThread, FlinkMiniCluster executor, JobGraph failingJob,
-								JobGraph job) {
+		public SubmissionThread(Thread timeoutThread, ForkableFlinkMiniCluster executor, JobGraph
+				failingJob,	JobGraph job) {
 			this.timeoutThread = timeoutThread;
 			this.executor = executor;
 			this.failingJob = failingJob;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala
index a923af6..4024304 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesSerializerTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesSerializerTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesSerializerTest.scala
index 60651d1..8ab41ec 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesSerializerTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesSerializerTest.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TraversableSerializerTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TraversableSerializerTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TraversableSerializerTest.scala
index 1155d73..587bbf3 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TraversableSerializerTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TraversableSerializerTest.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,


Mime
View raw message