flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/3] flink git commit: [FLINK-1925] [runtime] Splits the processing of the SubmitTask message into two phases: 1. TDD reception with eager acknowledgement and 2. TDD instantiation with a subsequent state update message.
Date Wed, 29 Apr 2015 09:50:30 GMT
Repository: flink
Updated Branches:
  refs/heads/master 9a18e5790 -> 4cb5b7254


[FLINK-1925] [runtime] Splits the processing of the SubmitTask message into two phases:
  1. TDD reception with eager acknowledgement and
  2. TDD instantiation with a subsequent state update message.

This closes #622


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4cb5b725
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4cb5b725
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4cb5b725

Branch: refs/heads/master
Commit: 4cb5b725447dad55dfbe66efbfc4580c51d8087d
Parents: 5a2ca81
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Thu Apr 23 16:13:22 2015 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Wed Apr 29 10:40:39 2015 +0200

----------------------------------------------------------------------
 .../flink/runtime/executiongraph/Execution.java |  24 +---
 .../runtime/executiongraph/ExecutionGraph.java  |   2 +
 .../flink/runtime/messages/Messages.scala       |  16 +--
 .../flink/runtime/taskmanager/TaskManager.scala |  80 +++++++----
 .../ExecutionGraphDeploymentTest.java           |   2 +-
 .../executiongraph/ExecutionGraphTestUtils.java |   9 +-
 .../ExecutionVertexCancelTest.java              |  12 +-
 .../ExecutionVertexDeploymentTest.java          |  37 ++---
 .../ExecutionVertexSchedulingTest.java          |   4 +-
 .../runtime/taskmanager/TaskManagerTest.java    | 136 ++++++++++++++-----
 .../testingUtils/TestingTaskManager.scala       |  42 +++++-
 .../TestingTaskManagerMessages.scala            |   2 +
 12 files changed, 223 insertions(+), 143 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4cb5b725/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index baed947..93b4f2f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -43,6 +43,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFutureAction;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.messages.Messages;
 import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.util.ExceptionUtils;
@@ -347,26 +348,9 @@ public class Execution implements Serializable {
 						}
 					}
 					else {
-						if (success == null) {
-							markFailed(new Exception("Failed to deploy the task to slot " + slot + ": TaskOperationResult
was null"));
-						}
-
-						if (success instanceof TaskOperationResult) {
-							TaskOperationResult result = (TaskOperationResult) success;
-
-							if (!result.executionID().equals(attemptId)) {
-								markFailed(new Exception("Answer execution id does not match the request execution
id."));
-							} else if (result.success()) {
-								switchToRunning();
-							} else {
-								// deployment failed :(
-								markFailed(new Exception("Failed to deploy the task " +
-										getVertexWithAttempt() + " to slot " + slot + ": " + result
-										.description()));
-							}
-						} else {
+						if (!(success.equals(Messages.getAcknowledge()))) {
 							markFailed(new Exception("Failed to deploy the task to slot " + slot +
-									": Response was not of type TaskOperationResult"));
+									": Response was not of type Acknowledge"));
 						}
 					}
 				}
@@ -759,7 +743,7 @@ public class Execution implements Serializable {
 		}
 	}
 
-	private boolean switchToRunning() {
+	boolean switchToRunning() {
 
 		if (transitionState(DEPLOYING, RUNNING)) {
 			sendPartitionInfos();

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb5b725/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 08e44f9..d38913e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -601,6 +601,8 @@ public class ExecutionGraph implements Serializable {
 		Execution attempt = this.currentExecutions.get(state.getID());
 		if (attempt != null) {
 			switch (state.getExecutionState()) {
+				case RUNNING:
+					return attempt.switchToRunning();
 				case FINISHED:
 					attempt.markFinished();
 					return true;

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb5b725/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/Messages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/Messages.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/Messages.scala
index ab8b8c2..e292a47 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/Messages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/Messages.scala
@@ -26,15 +26,7 @@ object Messages {
   /**
    * Message to signal the successful reception of another message
    */
-  case object Acknowledge {
-
-    /**
-     * Accessor for the case object instance, to simplify Java interoperability.
-     *
-     * @return The Acknowledge case object instance.
-     */
-    def get(): Acknowledge.type = this
-  }
+  case object Acknowledge
 
   /**
    * Signals that the receiver (JobManager/TaskManager) shall disconnect the sender.
@@ -49,4 +41,10 @@ object Messages {
    */
   case class Disconnect(reason: String)
 
+  /**
+   * Accessor for the case object instance, to simplify Java interoperability.
+   *
+   * @return The Acknowledge case object instance.
+   */
+  def getAcknowledge(): Acknowledge.type = Acknowledge
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb5b725/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 511de6b..a6b9133 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -68,6 +68,8 @@ import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
 import org.apache.flink.runtime.util.{MathUtils, EnvironmentInformation}
 import org.apache.flink.util.ExceptionUtils
 
+import org.slf4j.LoggerFactory
+
 import scala.concurrent._
 import scala.concurrent.duration._
 import scala.util.{Failure, Success}
@@ -133,7 +135,7 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
   protected val resources = HardwareDescription.extractFromSystem(memoryManager.getMemorySize)
 
   /** Registry of all tasks currently executed by this TaskManager */
-  protected val runningTasks = scala.collection.mutable.HashMap[ExecutionAttemptID, Task]()
+  protected val runningTasks = new util.concurrent.ConcurrentHashMap[ExecutionAttemptID,
Task]()
 
   /** Handler for shared broadcast variables (shared between multiple Tasks) */
   protected val bcVarManager = new BroadcastVariableManager()
@@ -380,7 +382,7 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
       // marks a task as failed for an external reason
       // external reasons are reasons other than the task code itself throwing an exception
       case FailTask(executionID, cause) =>
-        runningTasks.get(executionID) match {
+        Option(runningTasks.get(executionID)) match {
           case Some(task) =>
 
             // execute failing operation concurrently
@@ -395,7 +397,7 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
 
       // cancels a task
       case CancelTask(executionID) =>
-        runningTasks.get(executionID) match {
+        Option(runningTasks.get(executionID)) match {
           case Some(task) =>
             // execute cancel operation concurrently
             implicit val executor = context.dispatcher
@@ -422,11 +424,12 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
   private def handleCheckpointingMessage(message: CheckpointingMessage): Unit = {
 
     message match {
+
       case BarrierReq(attemptID, checkpointID) =>
         log.debug(s"[FT-TaskManager] Barrier $checkpointID request received " +
           s"for attempt $attemptID.")
 
-        runningTasks.get(attemptID) match {
+        Option(runningTasks.get(attemptID)) match {
           case Some(i) =>
             if (i.getExecutionState == ExecutionState.RUNNING) {
               i.getEnvironment.getInvokable match {
@@ -773,31 +776,45 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
   // --------------------------------------------------------------------------
 
   /**
-   * Receives a [[TaskDeploymentDescriptor]] describing the task to be executed. Sets up
a
-   * [[RuntimeEnvironment]] for the task and starts its execution in a separate thread.
+   * Receives a [[TaskDeploymentDescriptor]] describing the task to be executed. It eagerly
+   * acknowledges the task reception to the sender and asynchronously starts the initialization
of
+   * the task.
    *
    * @param tdd TaskDeploymentDescriptor describing the task to be executed on this [[TaskManager]]
    */
   private def submitTask(tdd: TaskDeploymentDescriptor): Unit = {
+    val slot = tdd.getTargetSlotNumber
+
+    if (!isConnected) {
+      sender ! Failure(
+        new IllegalStateException("TaskManager is not associated with a JobManager.")
+      )
+    } else if (slot < 0 || slot >= numberOfSlots) {
+      sender ! Failure(new Exception(s"Target slot $slot does not exist on TaskManager."))
+    } else {
+      sender ! Acknowledge
+
+      Future {
+        initializeTask(tdd)
+      }(context.dispatcher)
+    }
+  }
+
+  /** Sets up a [[org.apache.flink.runtime.execution.RuntimeEnvironment]] for the task and
starts
+    * its execution in a separate thread.
+    *
+    * @param tdd TaskDeploymentDescriptor describing the task to be executed on this [[TaskManager]]
+    */
+  private def initializeTask(tdd: TaskDeploymentDescriptor): Unit ={
     val jobID = tdd.getJobID
     val vertexID = tdd.getVertexID
     val executionID = tdd.getExecutionId
     val taskIndex = tdd.getIndexInSubtaskGroup
     val numSubtasks = tdd.getNumberOfSubtasks
-    val slot = tdd.getTargetSlotNumber
     var startRegisteringTask = 0L
     var task: Task = null
 
-    // all operations are in a try / catch block to make sure we send a result upon any failure
     try {
-      // check that we are already registered
-      if (!isConnected) {
-        throw new IllegalStateException("TaskManager is not associated with a JobManager")
-      }
-      if (slot < 0 || slot >= numberOfSlots) {
-        throw new Exception(s"Target slot ${slot} does not exist on TaskManager.")
-      }
-
       val userCodeClassLoader = libraryCacheManager match {
         case Some(manager) =>
           if (log.isDebugEnabled) {
@@ -807,8 +824,10 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
           // triggers the download of all missing jar files from the job manager
           manager.registerTask(jobID, executionID, tdd.getRequiredJarFiles)
 
+          if (log.isDebugEnabled) {
           log.debug(s"Register task $executionID at library cache manager " +
             s"took ${(System.currentTimeMillis() - startRegisteringTask) / 1000.0}s")
+          }
 
           manager.getClassLoader(jobID)
         case None => throw new IllegalStateException("There is no valid library cache
manager.")
@@ -820,8 +839,8 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
 
       task = new Task(jobID, vertexID, taskIndex, numSubtasks, executionID,
         tdd.getTaskName, self)
-      
-      runningTasks.put(executionID, task) match {
+
+      Option(runningTasks.put(executionID, task)) match {
         case Some(_) => throw new RuntimeException(
           s"TaskManager contains already a task with executionID $executionID.")
         case None =>
@@ -848,7 +867,7 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
             opStateCarrier.injectState(tdd.getOperatorStates)
         }
       }
-      
+
       // register the task with the network stack and profiles
       log.info(s"Register task $task.")
       network.registerTask(task)
@@ -865,15 +884,13 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
         throw new RuntimeException("Cannot start task. Task was canceled or failed.")
       }
 
-      sender ! new TaskOperationResult(executionID, true)
-    }
-    catch {
+      self ! UpdateTaskExecutionState(
+        new TaskExecutionState(jobID, executionID, ExecutionState.RUNNING)
+      )
+    } catch {
       case t: Throwable =>
-        val message = if (t.isInstanceOf[CancelTaskException]) {
-          "Task was canceled"
-        } else {
+        if (!t.isInstanceOf[CancelTaskException]) {
           log.error("Could not instantiate task with execution ID " + executionID, t)
-          ExceptionUtils.stringifyException(t)
         }
 
         try {
@@ -887,7 +904,9 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
           case t: Throwable => log.error("Error during cleanup of task deployment.", t)
         }
 
-        sender ! new TaskOperationResult(executionID, false, message)
+        self ! UpdateTaskExecutionState(
+          new TaskExecutionState(jobID, executionID, ExecutionState.FAILED, t)
+        )
     }
   }
 
@@ -901,7 +920,7 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
          executionId: ExecutionAttemptID,
          partitionInfos: Seq[(IntermediateDataSetID, InputChannelDeploymentDescriptor)])
: Unit = {
 
-    runningTasks.get(executionId) match {
+    Option(runningTasks.get(executionId)) match {
       case Some(task) =>
 
         val errors: Seq[String] = partitionInfos.flatMap { info =>
@@ -958,7 +977,7 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
     if (runningTasks.size > 0) {
       log.info("Cancelling all computations and discarding all cached data.")
 
-      for (t <- runningTasks.values) {
+      for (t <- runningTasks.values().asScala) {
         t.failExternally(cause)
         unregisterTaskAndNotifyFinalState(t.getExecutionId)
       }
@@ -966,7 +985,8 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
   }
 
   private def unregisterTaskAndNotifyFinalState(executionID: ExecutionAttemptID): Unit =
{
-    runningTasks.remove(executionID) match {
+
+    Option(runningTasks.remove(executionID)) match {
       case Some(task) =>
 
         // mark the task as failed if it is not yet in a final state

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb5b725/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 8c4879f..42c3f84 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -126,7 +126,7 @@ public class ExecutionGraphDeploymentTest {
 
 			vertex.deployToSlot(slot);
 
-			assertEquals(ExecutionState.RUNNING, vertex.getExecutionState());
+			assertEquals(ExecutionState.DEPLOYING, vertex.getExecutionState());
 
 			TaskDeploymentDescriptor descr = tm.lastTDD;
 			assertNotNull(descr);

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb5b725/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 7380b36..a1ee79b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -27,6 +27,7 @@ import java.net.InetAddress;
 import java.util.LinkedList;
 
 import akka.actor.ActorRef;
+import akka.actor.Status;
 import akka.actor.UntypedActor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.JobException;
@@ -43,6 +44,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.messages.Messages;
 import org.apache.flink.runtime.messages.TaskMessages.SubmitTask;
 import org.apache.flink.runtime.messages.TaskMessages.FailIntermediateResultPartitions;
 import org.apache.flink.runtime.messages.TaskMessages.CancelTask;
@@ -119,7 +121,7 @@ public class ExecutionGraphTestUtils {
 				SubmitTask submitTask = (SubmitTask) msg;
 				lastTDD = submitTask.tasks();
 
-				getSender().tell(new TaskOperationResult(submitTask.tasks().getExecutionId(), true),
getSelf());
+				getSender().tell(Messages.getAcknowledge(), getSelf());
 			} else if (msg instanceof CancelTask) {
 				CancelTask cancelTask = (CancelTask) msg;
 				getSender().tell(new TaskOperationResult(cancelTask.attemptID(), true), getSelf());
@@ -136,10 +138,7 @@ public class ExecutionGraphTestUtils {
 		@Override
 		public void onReceive(Object msg) throws Exception {
 			if (msg instanceof SubmitTask) {
-				SubmitTask submitTask = (SubmitTask) msg;
-
-				getSender().tell(new TaskOperationResult(submitTask.tasks().getExecutionId(),
-						false, ERROR_MESSAGE),	getSelf());
+				getSender().tell(new Status.Failure(new Exception(ERROR_MESSAGE)),	getSelf());
 			} else if (msg instanceof CancelTask) {
 				CancelTask cancelTask = (CancelTask) msg;
 				getSender().tell(new TaskOperationResult(cancelTask.attemptID(), true), getSelf());

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb5b725/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index 226b256..757976d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -35,13 +35,13 @@ import akka.japi.Creator;
 import akka.testkit.JavaTestKit;
 
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.messages.Messages;
 import org.apache.flink.runtime.messages.TaskMessages;
 import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -223,7 +223,7 @@ public class ExecutionVertexCancelTest {
 							TaskOperationResult(execId, false), new TaskOperationResult(execId, true))));
 
 					Instance instance = getInstance(taskManager);
-			SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+					SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 
 					vertex.deployToSlot(slot);
 
@@ -252,11 +252,6 @@ public class ExecutionVertexCancelTest {
 					// the call did not yet execute, so it is still in canceling
 					assertEquals(ExecutionState.CANCELING, vertex.getExecutionState());
 
-					// trigger the correcting cancel call, should properly set state to cancelled
-					actions.triggerNextAction();
-					// process onComplete callback
-					actions.triggerNextAction();
-
 					vertex.getCurrentExecutionAttempt().cancelingComplete();
 
 					assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
@@ -645,8 +640,7 @@ public class ExecutionVertexCancelTest {
 		@Override
 		public void onReceive(Object message) throws Exception {
 			if(message instanceof TaskMessages.SubmitTask){
-				TaskDeploymentDescriptor tdd = ((TaskMessages.SubmitTask) message).tasks();
-				getSender().tell(new TaskOperationResult(tdd.getExecutionId(), true), getSelf());
+				getSender().tell(Messages.getAcknowledge(), getSelf());
 			}else if(message instanceof TaskMessages.CancelTask){
 				index++;
 				if(index >= results.length){

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb5b725/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index 7b72669..c08ae01 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -22,6 +22,7 @@ import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.*;
 
 import static org.junit.Assert.*;
 
+import akka.actor.Actor;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
@@ -73,7 +74,7 @@ public class ExecutionVertexDeploymentTest {
 			
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
 			vertex.deployToSlot(slot);
-			assertEquals(ExecutionState.RUNNING, vertex.getExecutionState());
+			assertEquals(ExecutionState.DEPLOYING, vertex.getExecutionState());
 			
 			// no repeated scheduling
 			try {
@@ -102,7 +103,7 @@ public class ExecutionVertexDeploymentTest {
 
 			final JobVertexID jid = new JobVertexID();
 
-			final TestActorRef<?> simpleTaskManager = TestActorRef.create(system,
+			final TestActorRef<? extends Actor> simpleTaskManager = TestActorRef.create(system,
 					Props.create(SimpleAcknowledgingTaskManager.class));
 			
 			final Instance instance = getInstance(simpleTaskManager);
@@ -118,7 +119,7 @@ public class ExecutionVertexDeploymentTest {
 			
 			vertex.deployToSlot(slot);
 			
-			assertEquals(ExecutionState.RUNNING, vertex.getExecutionState());
+			assertEquals(ExecutionState.DEPLOYING, vertex.getExecutionState());
 			
 			// no repeated scheduling
 			try {
@@ -131,7 +132,7 @@ public class ExecutionVertexDeploymentTest {
 			
 			assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
 			assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
-			assertTrue(vertex.getStateTimestamp(ExecutionState.RUNNING) > 0);
+			assertTrue(vertex.getStateTimestamp(ExecutionState.RUNNING) == 0);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -146,7 +147,7 @@ public class ExecutionVertexDeploymentTest {
 		try {
 			final JobVertexID jid = new JobVertexID();
 
-			final TestActorRef<?> simpleTaskManager = TestActorRef.create(system,
+			final TestActorRef<? extends Actor> simpleTaskManager = TestActorRef.create(system,
 					Props.create(SimpleAcknowledgingTaskManager.class));
 			
 			final Instance instance = getInstance(simpleTaskManager);
@@ -168,15 +169,8 @@ public class ExecutionVertexDeploymentTest {
 				fail("Scheduled from wrong state");
 			}
 			catch (IllegalStateException e) {}
-			
-			// wait until the state transition must be done
-			for (int i = 0; i < 100; i++) {
-				if (vertex.getExecutionState() != ExecutionState.RUNNING) {
-					Thread.sleep(10);
-				}
-			}
-			
-			assertEquals(ExecutionState.RUNNING, vertex.getExecutionState());
+
+			assertEquals(ExecutionState.DEPLOYING, vertex.getExecutionState());
 			
 			// no repeated scheduling
 			try {
@@ -187,7 +181,7 @@ public class ExecutionVertexDeploymentTest {
 			
 			assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
 			assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
-			assertTrue(vertex.getStateTimestamp(ExecutionState.RUNNING) > 0);
+			assertTrue(vertex.getStateTimestamp(ExecutionState.RUNNING) == 0);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -202,7 +196,7 @@ public class ExecutionVertexDeploymentTest {
 
 			final JobVertexID jid = new JobVertexID();
 
-			final TestActorRef<?> simpleTaskManager = TestActorRef.create(system,
+			final TestActorRef<? extends Actor> simpleTaskManager = TestActorRef.create(system,
 					Props.create(SimpleFailingTaskManager.class));
 			
 			final Instance instance = getInstance(simpleTaskManager);
@@ -238,7 +232,7 @@ public class ExecutionVertexDeploymentTest {
 		try {
 			final JobVertexID jid = new JobVertexID();
 
-			final TestActorRef<?> simpleTaskManager = TestActorRef.create(system,
+			final TestActorRef<? extends Actor> simpleTaskManager = TestActorRef.create(system,
 					Props.create(SimpleFailingTaskManager.class));
 			
 			final Instance instance = getInstance(simpleTaskManager);
@@ -287,7 +281,7 @@ public class ExecutionVertexDeploymentTest {
 
 			TestingUtils.setExecutionContext(ec);
 
-			final TestActorRef<?> simpleTaskManager = TestActorRef.create(system,
+			final TestActorRef<? extends Actor> simpleTaskManager = TestActorRef.create(system,
 					Props.create(SimpleAcknowledgingTaskManager.class));
 			final Instance instance = getInstance(simpleTaskManager);
 			final SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
@@ -337,7 +331,7 @@ public class ExecutionVertexDeploymentTest {
 					AkkaUtils.getDefaultTimeout());
 			final ExecutionAttemptID eid = vertex.getCurrentExecutionAttempt().getAttemptId();
 
-			final TestActorRef<?> simpleTaskManager = TestActorRef.create(system, Props.create(new
+			final TestActorRef<? extends Actor> simpleTaskManager = TestActorRef.create(system,
Props.create(new
 					ExecutionVertexCancelTest.CancelSequenceTaskManagerCreator(new
 					TaskOperationResult(eid, false), new TaskOperationResult(eid, true))));
 
@@ -366,11 +360,6 @@ public class ExecutionVertexDeploymentTest {
 
 			assertEquals(ExecutionState.FAILED, vertex.getExecutionState());
 
-			// should have sent another cancel call
-			queue.triggerNextAction();
-			// execute onComplete callback
-			queue.triggerNextAction();
-
 			assertEquals(testError, vertex.getFailureCause());
 			
 			assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb5b725/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
index e24a2b4..06f0e9d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
@@ -129,7 +129,7 @@ public class ExecutionVertexSchedulingTest {
 	}
 	
 	@Test
-	public void testScheduleToRunning() {
+	public void testScheduleToDeploying() {
 		try {
 			TestingUtils.setCallingThreadDispatcher(system);
 			ActorRef tm = TestActorRef.create(system, Props.create(ExecutionGraphTestUtils
@@ -149,7 +149,7 @@ public class ExecutionVertexSchedulingTest {
 
 			// try to deploy to the slot
 			vertex.scheduleForExecution(scheduler, false);
-			assertEquals(ExecutionState.RUNNING, vertex.getExecutionState());
+			assertEquals(ExecutionState.DEPLOYING, vertex.getExecutionState());
 		}
 		catch (Exception e) {
 			e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb5b725/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 8b7915f..056ed37 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -47,6 +47,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.Tasks;
 import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.messages.Messages;
 import org.apache.flink.runtime.messages.RegistrationMessages;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
 import org.apache.flink.runtime.messages.TaskMessages;
@@ -67,8 +68,10 @@ import scala.concurrent.duration.FiniteDuration;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
@@ -83,6 +86,8 @@ public class TaskManagerTest {
 
 	private static Timeout timeout = new Timeout(1, TimeUnit.MINUTES);
 
+	private final FiniteDuration d = new FiniteDuration(20, TimeUnit.SECONDS);
+
 	@BeforeClass
 	public static void setup() {
 		system = ActorSystem.create("TestActorSystem", TestingUtils.testConfig());
@@ -115,12 +120,12 @@ public class TaskManagerTest {
 					new ArrayList<BlobKey>(), 0);
 
 				final ActorRef tmClosure = taskManager;
-				new Within(duration("10 seconds")) {
+				new Within(d) {
 
 					@Override
 					protected void run() {
 						tmClosure.tell(new SubmitTask(tdd), getRef());
-						expectMsgEquals(new TaskOperationResult(eid, true));
+						expectMsgEquals(Messages.getAcknowledge());
 					}
 				};
 			}
@@ -172,18 +177,29 @@ public class TaskManagerTest {
 					new ArrayList<BlobKey>(), 0);
 
 				final ActorRef tm = taskManager;
-				final FiniteDuration d = duration("10 second");
 
 				new Within(d) {
 
 					@Override
 					protected void run() {
 						try {
+							Future<Object> t1Running = Patterns.ask(
+									tm,
+									new TestingTaskManagerMessages.NotifyWhenTaskIsRunning(eid1),
+									timeout);
+							Future<Object> t2Running = Patterns.ask(
+									tm,
+									new TestingTaskManagerMessages.NotifyWhenTaskIsRunning(eid2),
+									timeout);
+
 							tm.tell(new SubmitTask(tdd1), getRef());
 							tm.tell(new SubmitTask(tdd2), getRef());
 
-							expectMsgEquals(new TaskOperationResult(eid1, true));
-							expectMsgEquals(new TaskOperationResult(eid2, true));
+							expectMsgEquals(Messages.getAcknowledge());
+							expectMsgEquals(Messages.getAcknowledge());
+
+							Await.ready(t1Running, d);
+							Await.ready(t2Running, d);
 							
 							tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
 
@@ -288,34 +304,34 @@ public class TaskManagerTest {
 						Collections.<InputGateDeploymentDescriptor>emptyList(),
 					new ArrayList<BlobKey>(), 0);
 
-				new Within(duration("10 second")){
+				new Within(d){
 
 					@Override
 					protected void run() {
-						tm.tell(new SubmitTask(tdd1), getRef());
-						tm.tell(new SubmitTask(tdd2), getRef());
-
-						TaskOperationResult result = expectMsgClass(TaskOperationResult.class);
-						assertFalse(result.success());
-						assertEquals(eid1, result.executionID());
+						try {
+							tm.tell(new SubmitTask(tdd1), getRef());
+							tm.tell(new SubmitTask(tdd2), getRef());
 
-						result = expectMsgClass(TaskOperationResult.class);
-						assertFalse(result.success());
-						assertEquals(eid2, result.executionID());
+							expectMsgEquals(Messages.getAcknowledge());
+							expectMsgEquals(Messages.getAcknowledge());
 
-						tm.tell(new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid1),
-								getRef());
-						tm.tell(new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid2),
-								getRef());
+							tm.tell(new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid1),
+									getRef());
+							tm.tell(new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid2),
+									getRef());
 
-						expectMsgEquals(true);
-						expectMsgEquals(true);
+							expectMsgEquals(true);
+							expectMsgEquals(true);
 
-						tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
-						Map<ExecutionAttemptID, Task> tasks = expectMsgClass(TestingTaskManagerMessages
-								.ResponseRunningTasks.class).asJava();
+							tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
+							Map<ExecutionAttemptID, Task> tasks = expectMsgClass(TestingTaskManagerMessages
+									.ResponseRunningTasks.class).asJava();
 
-						assertEquals(0, tasks.size());
+							assertEquals(0, tasks.size());
+						} catch (Exception e){
+							e.printStackTrace();
+							fail(e.getMessage());
+						}
 					}
 				};
 			}
@@ -378,17 +394,27 @@ public class TaskManagerTest {
 						Collections.singletonList(ircdd),
 						new ArrayList<BlobKey>(), 0);
 
-				final FiniteDuration d = duration("10 second");
-
 				new Within(d) {
 
 					@Override
 					protected void run() {
 						try {
+							Future<Object> t1Running = Patterns.ask(
+									tm,
+									new TestingTaskManagerMessages.NotifyWhenTaskIsRunning(eid1),
+									timeout);
+							Future<Object> t2Running = Patterns.ask(
+									tm,
+									new TestingTaskManagerMessages.NotifyWhenTaskIsRunning(eid2),
+									timeout);
+
 							tm.tell(new SubmitTask(tdd1), getRef());
-							expectMsgEquals(new TaskOperationResult(eid1, true));
+							expectMsgEquals(Messages.getAcknowledge());
 							tm.tell(new SubmitTask(tdd2), getRef());
-							expectMsgEquals(new TaskOperationResult(eid2, true));
+							expectMsgEquals(Messages.getAcknowledge());
+
+							Await.ready(t1Running, d);
+							Await.ready(t2Running, d);
 
 							tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
 							Map<ExecutionAttemptID, Task> tasks = expectMsgClass(TestingTaskManagerMessages.ResponseRunningTasks
@@ -460,7 +486,11 @@ public class TaskManagerTest {
 				final ExecutionAttemptID eid1 = new ExecutionAttemptID();
 				final ExecutionAttemptID eid2 = new ExecutionAttemptID();
 
-				jobManager = system.actorOf(Props.create(new SimpleLookupFailingUpdateJobManagerCreator()));
+				jobManager = system.actorOf(
+						Props.create(
+								new SimpleLookupFailingUpdateJobManagerCreator(eid2)
+						)
+				);
 				taskManager = createTaskManager(jobManager);
 				final ActorRef tm = taskManager;
 
@@ -488,19 +518,28 @@ public class TaskManagerTest {
 						Collections.singletonList(ircdd),
 						new ArrayList<BlobKey>(), 0);
 
-				final FiniteDuration d = duration("10 second");
-
 				new Within(d){
 
 					@Override
 					protected void run() {
 						try {
-							// deploy sender before receiver, so the target is online when the sender requests
the connection info
+							Future<Object> t1Running = Patterns.ask(
+									tm,
+									new TestingTaskManagerMessages.NotifyWhenTaskIsRunning(eid1),
+									timeout);
+							Future<Object> t2Running = Patterns.ask(
+									tm,
+									new TestingTaskManagerMessages.NotifyWhenTaskIsRunning(eid2),
+									timeout);
+
 							tm.tell(new SubmitTask(tdd2), getRef());
 							tm.tell(new SubmitTask(tdd1), getRef());
 
-							expectMsgEquals(new TaskOperationResult(eid2, true));
-							expectMsgEquals(new TaskOperationResult(eid1, true));
+							expectMsgEquals(Messages.getAcknowledge());
+							expectMsgEquals(Messages.getAcknowledge());
+
+							Await.ready(t1Running, d);
+							Await.ready(t2Running, d);
 
 							tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
 							Map<ExecutionAttemptID, Task> tasks = expectMsgClass(TestingTaskManagerMessages
@@ -588,10 +627,23 @@ public class TaskManagerTest {
 
 	public static class SimpleLookupFailingUpdateJobManager extends SimpleLookupJobManager{
 
+		private final Set<ExecutionAttemptID> validIDs;
+
+		public SimpleLookupFailingUpdateJobManager(Set<ExecutionAttemptID> ids) {
+			this.validIDs = new HashSet<ExecutionAttemptID>(ids);
+		}
+
 		@Override
 		public void onReceive(Object message) throws Exception{
 			if (message instanceof TaskMessages.UpdateTaskExecutionState) {
-				getSender().tell(false, getSelf());
+				TaskMessages.UpdateTaskExecutionState updateMsg =
+						(TaskMessages.UpdateTaskExecutionState) message;
+
+				if(validIDs.contains(updateMsg.taskExecutionState().getID())) {
+					getSender().tell(true, getSelf());
+				} else {
+					getSender().tell(false, getSelf());
+				}
 			} else {
 				super.onReceive(message);
 			}
@@ -608,9 +660,19 @@ public class TaskManagerTest {
 
 	public static class SimpleLookupFailingUpdateJobManagerCreator implements Creator<SimpleLookupFailingUpdateJobManager>{
 
+		private final Set<ExecutionAttemptID> validIDs;
+
+		public SimpleLookupFailingUpdateJobManagerCreator(ExecutionAttemptID ... ids) {
+			validIDs = new HashSet<ExecutionAttemptID>();
+
+			for(ExecutionAttemptID id : ids) {
+				this.validIDs.add(id);
+			}
+		}
+
 		@Override
 		public SimpleLookupFailingUpdateJobManager create() throws Exception {
-			return new SimpleLookupFailingUpdateJobManager();
+			return new SimpleLookupFailingUpdateJobManager(validIDs);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb5b725/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
index 702e34c..bb0c1f9 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
@@ -20,13 +20,14 @@ package org.apache.flink.runtime.testingUtils
 
 import akka.actor.{Terminated, ActorRef}
 import org.apache.flink.api.common.JobID
+import org.apache.flink.runtime.execution.ExecutionState
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
 import org.apache.flink.runtime.instance.InstanceConnectionInfo
 import org.apache.flink.runtime.io.disk.iomanager.IOManager
 import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager
 import org.apache.flink.runtime.messages.Messages.Disconnect
-import org.apache.flink.runtime.messages.TaskMessages.UnregisterTask
+import org.apache.flink.runtime.messages.TaskMessages.{UpdateTaskExecutionState, UnregisterTask}
 import org.apache.flink.runtime.taskmanager.{TaskManagerConfiguration, TaskManager}
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved
 import org.apache.flink.runtime.testingUtils.TestingMessages.DisableDisconnect
@@ -48,10 +49,14 @@ class TestingTaskManager(config: TaskManagerConfiguration,
   extends TaskManager(config, connectionInfo, jobManagerAkkaURL,
                       memoryManager, ioManager, network, numberOfSlots) {
 
+  import scala.collection.JavaConverters._
+
 
   val waitForRemoval = scala.collection.mutable.HashMap[ExecutionAttemptID, Set[ActorRef]]()
   val waitForJobRemoval = scala.collection.mutable.HashMap[JobID, Set[ActorRef]]()
   val waitForJobManagerToBeTerminated = scala.collection.mutable.HashMap[String, Set[ActorRef]]()
+  val waitForRunning = scala.collection.mutable.HashMap[ExecutionAttemptID, Set[ActorRef]]()
+  val unregisteredTasks = scala.collection.mutable.HashSet[ExecutionAttemptID]()
 
   var disconnectDisabled = false
 
@@ -64,16 +69,30 @@ class TestingTaskManager(config: TaskManagerConfiguration,
    * Handler for testing related messages
    */
   def receiveTestMessages: Receive = {
+    case NotifyWhenTaskIsRunning(executionID) => {
+      Option(runningTasks.get(executionID)) match {
+        case Some(_) => sender ! true
+        case None =>
+          val listeners = waitForRunning.getOrElse(executionID, Set())
+          waitForRunning += (executionID -> (listeners + sender))
+      }
+    }
 
     case RequestRunningTasks =>
-      sender ! ResponseRunningTasks(runningTasks.toMap)
+      sender ! ResponseRunningTasks(runningTasks.asScala.toMap)
       
     case NotifyWhenTaskRemoved(executionID) =>
-      runningTasks.get(executionID) match {
+      Option(runningTasks.get(executionID)) match {
         case Some(_) =>
           val set = waitForRemoval.getOrElse(executionID, Set())
           waitForRemoval += (executionID -> (set + sender))
-        case None => sender ! true
+        case None =>
+          if(unregisteredTasks.contains(executionID)) {
+            sender ! true
+          } else {
+              val set = waitForRemoval.getOrElse(executionID, Set())
+              waitForRemoval += (executionID -> (set + sender))
+          }
       }
       
     case UnregisterTask(executionID) =>
@@ -82,6 +101,8 @@ class TestingTaskManager(config: TaskManagerConfiguration,
         case Some(actors) => for(actor <- actors) actor ! true
         case None =>
       }
+
+      unregisteredTasks += executionID
       
     case RequestBroadcastVariablesWithReferences =>
       sender ! ResponseBroadcastVariablesWithReferences(
@@ -96,7 +117,7 @@ class TestingTaskManager(config: TaskManagerConfiguration,
       sender ! ResponseNumActiveConnections(numActive)
 
     case NotifyWhenJobRemoved(jobID) =>
-      if(runningTasks.values.exists(_.getJobID == jobID)){
+      if(runningTasks.values.asScala.exists(_.getJobID == jobID)){
         val set = waitForJobRemoval.getOrElse(jobID, Set())
         waitForJobRemoval += (jobID -> (set + sender))
         import context.dispatcher
@@ -109,7 +130,7 @@ class TestingTaskManager(config: TaskManagerConfiguration,
       }
 
     case CheckIfJobRemoved(jobID) =>
-      if(runningTasks.values.forall(_.getJobID != jobID)){
+      if(runningTasks.values.asScala.forall(_.getJobID != jobID)){
         waitForJobRemoval.remove(jobID) match {
           case Some(listeners) => listeners foreach (_ ! true)
           case None =>
@@ -147,5 +168,14 @@ class TestingTaskManager(config: TaskManagerConfiguration,
 
     case DisableDisconnect =>
       disconnectDisabled = true
+
+    case msg @ UpdateTaskExecutionState(taskExecutionState) =>
+      super.receiveWithLogMessages(msg)
+
+      if(taskExecutionState.getExecutionState == ExecutionState.RUNNING) {
+        waitForRunning.get(taskExecutionState.getID) foreach {
+          _ foreach (_ ! true)
+        }
+      }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb5b725/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
index 2051ef5..c9a2f73 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
@@ -29,6 +29,8 @@ import org.apache.flink.runtime.taskmanager.Task
 object TestingTaskManagerMessages {
   
   case class NotifyWhenTaskRemoved(executionID: ExecutionAttemptID)
+
+  case class NotifyWhenTaskIsRunning(executionID: ExecutionAttemptID)
   
   case class ResponseRunningTasks(tasks: Map[ExecutionAttemptID, Task]){
     import collection.JavaConverters._


Mime
View raw message