flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [4/8] flink git commit: [FLINK-1672] [runtime] Unify Task and RuntimeEnvironment into one class.
Date Mon, 11 May 2015 20:19:38 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/8e613014/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 747cc85..bdefea6 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.taskmanager
 import java.io.{File, IOException}
 import java.net.{InetAddress, InetSocketAddress}
 import java.util
-import java.util.concurrent.{TimeUnit, FutureTask}
+import java.util.concurrent.TimeUnit
 import java.lang.reflect.Method
 import java.lang.management.{GarbageCollectorMXBean, ManagementFactory, MemoryMXBean}
 
@@ -36,16 +36,13 @@ import com.codahale.metrics.jvm.{MemoryUsageGaugeSet, GarbageCollectorMetricSet}
 import com.fasterxml.jackson.databind.ObjectMapper
 import grizzled.slf4j.Logger
 
-import org.apache.flink.api.common.cache.DistributedCache
 import org.apache.flink.configuration._
-import org.apache.flink.core.fs.Path
 import org.apache.flink.runtime.{ActorSynchronousLogging, ActorLogMessages}
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.blob.{BlobService, BlobCache}
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager
 import org.apache.flink.runtime.deployment.{InputChannelDeploymentDescriptor, TaskDeploymentDescriptor}
 import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, FallbackLibraryCacheManager, LibraryCacheManager}
-import org.apache.flink.runtime.execution.{CancelTaskException, ExecutionState, RuntimeEnvironment}
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
 import org.apache.flink.runtime.filecache.FileCache
 import org.apache.flink.runtime.instance.{HardwareDescription, InstanceConnectionInfo, InstanceID}
@@ -54,7 +51,6 @@ import org.apache.flink.runtime.io.disk.iomanager.{IOManager, IOManagerAsync}
 import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.io.network.netty.NettyConfig
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID
-import org.apache.flink.runtime.jobgraph.tasks.{OperatorStateCarrier,BarrierTransceiver}
 import org.apache.flink.runtime.jobmanager.JobManager
 import org.apache.flink.runtime.memorymanager.{MemoryManager, DefaultMemoryManager}
 import org.apache.flink.runtime.messages.CheckpointingMessages.{CheckpointingMessage, BarrierReq}
@@ -67,9 +63,6 @@ import org.apache.flink.runtime.process.ProcessReaper
 import org.apache.flink.runtime.security.SecurityUtils
 import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
 import org.apache.flink.runtime.util.{MathUtils, EnvironmentInformation}
-import org.apache.flink.util.ExceptionUtils
-
-import org.slf4j.LoggerFactory
 
 import scala.concurrent._
 import scala.concurrent.duration._
@@ -136,13 +129,13 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
   protected val resources = HardwareDescription.extractFromSystem(memoryManager.getMemorySize)
 
   /** Registry of all tasks currently executed by this TaskManager */
-  protected val runningTasks = new util.concurrent.ConcurrentHashMap[ExecutionAttemptID, Task]()
+  protected val runningTasks = new java.util.HashMap[ExecutionAttemptID, Task]()
 
   /** Handler for shared broadcast variables (shared between multiple Tasks) */
   protected val bcVarManager = new BroadcastVariableManager()
 
   /** Handler for distributed files cached by this TaskManager */
-  protected val fileCache = new FileCache()
+  protected val fileCache = new FileCache(config.configuration)
 
   /** Registry of metrics periodically transmitted to the JobManager */
   private val metricRegistry = TaskManager.createMetricsRegistry()
@@ -282,6 +275,9 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
 
     case Disconnect(msg) =>
       handleJobManagerDisconnect(sender(), "JobManager requested disconnect: " + msg)
+
+    case FatalError(message, cause) =>
+      killTaskManagerFatal(message, cause)
   }
 
   /**
@@ -344,12 +340,13 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
       // state transition
 
       case updateMsg @ UpdateTaskExecutionState(taskExecutionState: TaskExecutionState) =>
+        
+        // we receive these from our tasks and forward them to the JobManager
         currentJobManager foreach {
           jobManager => {
             val futureResponse = (jobManager ? updateMsg)(askTimeout)
-
+            
             val executionID = taskExecutionState.getID
-            val executionState = taskExecutionState.getExecutionState
 
             futureResponse.mapTo[Boolean].onComplete {
               // IMPORTANT: In the future callback, we cannot directly modify state
@@ -359,21 +356,16 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
                   self ! FailTask(executionID,
                     new Exception("Task has been cancelled on the JobManager."))
                 }
-
-                if (!result || executionState.isTerminal) {
-                  self ! UnregisterTask(executionID)
-                }
+                
               case Failure(t) =>
                 self ! FailTask(executionID, new Exception(
                   "Failed to send ExecutionStateChange notification to JobManager"))
-
-                self ! UnregisterTask(executionID)
             }(context.dispatcher)
           }
         }
 
       // removes the task from the TaskManager and frees all its resources
-      case UnregisterTask(executionID) =>
+      case TaskInFinalState(executionID) =>
         unregisterTaskAndNotifyFinalState(executionID)
 
       // starts a new task on the TaskManager
@@ -383,35 +375,22 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
       // marks a task as failed for an external reason
       // external reasons are reasons other than the task code itself throwing an exception
       case FailTask(executionID, cause) =>
-        Option(runningTasks.get(executionID)) match {
-          case Some(task) =>
-
-            // execute failing operation concurrently
-            implicit val executor = context.dispatcher
-            Future {
-              task.failExternally(cause)
-            }.onFailure{
-              case t: Throwable => log.error(s"Could not fail task ${task} externally.", t)
-            }
-          case None =>
+        val task = runningTasks.get(executionID)
+        if (task != null) {
+          task.failExternally(cause)
+        } else {
+          log.debug(s"Cannot find task to fail for execution ${executionID})")
         }
 
       // cancels a task
       case CancelTask(executionID) =>
-        Option(runningTasks.get(executionID)) match {
-          case Some(task) =>
-            // execute cancel operation concurrently
-            implicit val executor = context.dispatcher
-            Future {
-              task.cancelExecution()
-            }.onFailure{
-              case t: Throwable => log.error("Could not cancel task " + task, t)
-            }
-
-            sender ! new TaskOperationResult(executionID, true)
-
-          case None =>
-            sender ! new TaskOperationResult(executionID, false,
+        val task = runningTasks.get(executionID)
+        if (task != null) {
+          task.cancelExecution()
+          sender ! new TaskOperationResult(executionID, true)
+        } else {
+          log.debug(s"Cannot find task to cancel for execution ${executionID})")
+          sender ! new TaskOperationResult(executionID, false,
               "No task with that execution ID was found.")
         }
     }
@@ -430,24 +409,11 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
         log.debug(s"[FT-TaskManager] Barrier $checkpointID request received " +
           s"for attempt $attemptID.")
 
-        Option(runningTasks.get(attemptID)) match {
-          case Some(i) =>
-            if (i.getExecutionState == ExecutionState.RUNNING) {
-              i.getEnvironment.getInvokable match {
-                case barrierTransceiver: BarrierTransceiver =>
-                  new Thread(new Runnable {
-                    override def run(): Unit =
-                      barrierTransceiver.broadcastBarrierFromSource(checkpointID)
-                  }).start()
-
-                case _ => log.error("Taskmanager received a checkpoint request for " +
-                  s"non-checkpointing task $attemptID.")
-              }
-            }
-
-          case None =>
-            // may always happen in case of canceled/finished tasks
-            log.debug(s"Taskmanager received a checkpoint request for unknown task $attemptID.")
+        val task = runningTasks.get(attemptID)
+        if (task != null) {
+          task.triggerCheckpointBarrier(checkpointID)
+        } else {
+          log.debug(s"Taskmanager received a checkpoint request for unknown task $attemptID.")
         }
 
       // unknown checkpoint message
@@ -770,8 +736,7 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
       }
     }
   }
-
-
+  
   // --------------------------------------------------------------------------
   //  Task Operations
   // --------------------------------------------------------------------------
@@ -784,130 +749,46 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
    * @param tdd TaskDeploymentDescriptor describing the task to be executed on this [[TaskManager]]
    */
   private def submitTask(tdd: TaskDeploymentDescriptor): Unit = {
-    val slot = tdd.getTargetSlotNumber
-
-    if (!isConnected) {
-      sender ! Failure(
-        new IllegalStateException("TaskManager is not associated with a JobManager.")
-      )
-    } else if (slot < 0 || slot >= numberOfSlots) {
-      sender ! Failure(new Exception(s"Target slot $slot does not exist on TaskManager."))
-    } else {
-      sender ! Acknowledge
-
-      Future {
-        initializeTask(tdd)
-      }(context.dispatcher)
-    }
-  }
-
-  /** Sets up a [[org.apache.flink.runtime.execution.RuntimeEnvironment]] for the task and starts
-    * its execution in a separate thread.
-    *
-    * @param tdd TaskDeploymentDescriptor describing the task to be executed on this [[TaskManager]]
-    */
-  private def initializeTask(tdd: TaskDeploymentDescriptor): Unit ={
-    val jobID = tdd.getJobID
-    val vertexID = tdd.getVertexID
-    val executionID = tdd.getExecutionId
-    val taskIndex = tdd.getIndexInSubtaskGroup
-    val numSubtasks = tdd.getNumberOfSubtasks
-    var startRegisteringTask = 0L
-    var task: Task = null
-
     try {
-      val userCodeClassLoader = libraryCacheManager match {
-        case Some(manager) =>
-          if (log.isDebugEnabled) {
-            startRegisteringTask = System.currentTimeMillis()
-          }
-
-          // triggers the download of all missing jar files from the job manager
-          manager.registerTask(jobID, executionID, tdd.getRequiredJarFiles)
-
-          if (log.isDebugEnabled) {
-          log.debug(s"Register task $executionID at library cache manager " +
-            s"took ${(System.currentTimeMillis() - startRegisteringTask) / 1000.0}s")
-          }
-
-          manager.getClassLoader(jobID)
-        case None => throw new IllegalStateException("There is no valid library cache manager.")
-      }
-
-      if (userCodeClassLoader == null) {
-        throw new RuntimeException("No user code Classloader available.")
-      }
-
-      task = new Task(jobID, vertexID, taskIndex, numSubtasks, executionID,
-        tdd.getTaskName, self)
-
-      Option(runningTasks.put(executionID, task)) match {
-        case Some(_) => throw new RuntimeException(
-          s"TaskManager contains already a task with executionID $executionID.")
+      // grab some handles and sanity check on the fly
+      val jobManagerActor = currentJobManager match {
+        case Some(jm) => jm
         case None =>
+          throw new IllegalStateException("TaskManager is not associated with a JobManager.")
       }
-
-      val env = currentJobManager match {
-        case Some(jobManager) =>
-          val splitProvider = new TaskInputSplitProvider(jobManager, jobID, vertexID,
-            executionID, userCodeClassLoader, askTimeout)
-
-          new RuntimeEnvironment(jobManager, task, tdd, userCodeClassLoader,
-            memoryManager, ioManager, splitProvider, bcVarManager, network)
-
-        case None => throw new IllegalStateException(
-          "TaskManager has not yet been registered at a JobManager.")
-      }
-
-      task.setEnvironment(env)
-
-      //inject operator state
-      if (tdd.getOperatorStates != null) {
-        task.getEnvironment.getInvokable match {
-          case opStateCarrier: OperatorStateCarrier =>
-            opStateCarrier.injectState(tdd.getOperatorStates)
-        }
+      val libCache = libraryCacheManager match {
+        case Some(manager) => manager
+        case None => throw new IllegalStateException("There is no valid library cache manager.")
       }
 
-      // register the task with the network stack and profiles
-      log.info(s"Register task $task.")
-      network.registerTask(task)
-
-      val cpTasks = new util.HashMap[String, FutureTask[Path]]()
-
-      for (entry <- DistributedCache.readFileInfoFromConfig(tdd.getJobConfiguration).asScala) {
-        val cp = fileCache.createTmpFile(entry.getKey, entry.getValue, jobID)
-        cpTasks.put(entry.getKey, cp)
+      val slot = tdd.getTargetSlotNumber
+      if (slot < 0 || slot >= numberOfSlots) {
+        throw new IllegalArgumentException(s"Target slot $slot does not exist on TaskManager.")
       }
-      env.addCopyTasksForCacheFile(cpTasks)
 
-      if (!task.startExecution()) {
-        throw new RuntimeException("Cannot start task. Task was canceled or failed.")
+      // create the task. this does not grab any TaskManager resources or download
+      // and libraries - the operation does not block
+      val execId = tdd.getExecutionId
+      val task = new Task(tdd, memoryManager, ioManager, network, bcVarManager,
+                          self, jobManagerActor, config.timeout, libCache, fileCache)
+      
+      // add the task to the map
+      val prevTask = runningTasks.put(execId, task)
+      if (prevTask != null) {
+        // already have a task for that ID, put if back and report an error
+        runningTasks.put(execId, prevTask)
+        throw new IllegalStateException("TaskManager already contains a task for id " + execId)
       }
-
-      self ! UpdateTaskExecutionState(
-        new TaskExecutionState(jobID, executionID, ExecutionState.RUNNING)
-      )
-    } catch {
-      case t: Throwable =>
-        if (!t.isInstanceOf[CancelTaskException]) {
-          log.error("Could not instantiate task with execution ID " + executionID, t)
-        }
-
-        try {
-          if (task != null) {
-            task.failExternally(t)
-            removeAllTaskResources(task)
-          }
-
-          libraryCacheManager foreach { _.unregisterTask(jobID, executionID) }
-        } catch {
-          case t: Throwable => log.error("Error during cleanup of task deployment.", t)
-        }
-
-        self ! UpdateTaskExecutionState(
-          new TaskExecutionState(jobID, executionID, ExecutionState.FAILED, t)
-        )
+      
+      // all good, we kick off the task, which performs its own initialization
+      task.startTaskThread()
+      
+      sender ! Acknowledge
+    }
+    catch {
+      case t: Throwable => 
+        log.error("SubmitTask failed", t)
+        sender ! Failure(t)
     }
   }
 
@@ -927,19 +808,20 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
         val errors: Seq[String] = partitionInfos.flatMap { info =>
 
           val (resultID, partitionInfo) = info
-          val reader = task.getEnvironment.getInputGateById(resultID)
+          val reader = task.getInputGateById(resultID)
 
           if (reader != null) {
             Future {
               try {
                 reader.updateInputChannel(partitionInfo)
-              } catch {
+              }
+              catch {
                 case t: Throwable =>
                   log.error(s"Could not update input data location for task " +
                     s"${task.getTaskName}. Trying to fail  task.", t)
 
                   try {
-                    task.markFailed(t)
+                    task.failExternally(t)
                   }
                   catch {
                     case t: Throwable =>
@@ -977,20 +859,20 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
   private def cancelAndClearEverything(cause: Throwable) {
     if (runningTasks.size > 0) {
       log.info("Cancelling all computations and discarding all cached data.")
-
+      
       for (t <- runningTasks.values().asScala) {
         t.failExternally(cause)
-        unregisterTaskAndNotifyFinalState(t.getExecutionId)
       }
+      runningTasks.clear()
     }
   }
 
   private def unregisterTaskAndNotifyFinalState(executionID: ExecutionAttemptID): Unit = {
 
-    Option(runningTasks.remove(executionID)) match {
-      case Some(task) =>
-
-        // mark the task as failed if it is not yet in a final state
+    val task = runningTasks.remove(executionID)
+    if (task != null) {
+      
+        // the task must be in a terminal state
         if (!task.getExecutionState.isTerminal) {
           try {
             task.failExternally(new Exception("Task is being removed from TaskManager"))
@@ -999,66 +881,15 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
           }
         }
 
-        log.info(s"Unregister task with execution ID $executionID.")
-        removeAllTaskResources(task)
-        libraryCacheManager foreach { _.unregisterTask(task.getJobID, executionID) }
-
-        log.info(s"Updating FINAL execution state of ${task.getTaskName} " +
-          s"(${task.getExecutionId}) to ${task.getExecutionState}.")
+        log.info(s"Unregistering task and sending final execution state " +
+          s"${task.getExecutionState} to JobManager for task ${task.getTaskName} " +
+          s"(${task.getExecutionId})")
 
         self ! UpdateTaskExecutionState(new TaskExecutionState(
           task.getJobID, task.getExecutionId, task.getExecutionState, task.getFailureCause))
-
-      case None =>
-        log.debug(s"Cannot find task with ID $executionID to unregister.")
     }
-  }
-
-  /**
-   * This method cleans up the resources of a task in the distributed cache,
-   * network stack and the memory manager.
-   *
-   * If the cleanup in the network stack or memory manager fails, this is considered
-   * a fatal problem (critical resource leak) and causes the TaskManager to quit.
-   * A TaskManager JVM restart is the best safe way to fix that error.
-   *
-   * @param task The Task whose resources should be cleared.
-   */
-  private def removeAllTaskResources(task: Task): Unit = {
-
-    // release the critical things first, and fail fatally if it does not work
-
-    // this releases all task resources, like buffer pools and intermediate result
-    // partitions being built. If this fails, the TaskManager is in serious trouble,
-    // as this is a massive resource leak. We kill the TaskManager in that case,
-    // to recover through a clean JVM start
-    try {
-      network.unregisterTask(task)
-    } catch {
-      case t: Throwable =>
-        killTaskManagerFatal("Failed to unregister task resources from network stack", t)
-    }
-
-    // safety net to release all the task's memory
-    try {
-      task.unregisterMemoryManager(memoryManager)
-    } catch {
-      case t: Throwable =>
-        killTaskManagerFatal("Failed to unregister task memory from memory manager", t)
-    }
-
-    // release temp files from the distributed cache
-    if (task.getEnvironment != null) {
-      try {
-        for (entry <- DistributedCache.readFileInfoFromConfig(
-          task.getEnvironment.getJobConfiguration).asScala) {
-          fileCache.deleteTmpFile(entry.getKey, entry.getValue, task.getJobID)
-        }
-      } catch {
-        // this is pretty unpleasant, but not a reason to give up immediately
-        case e: Exception => log.error(
-          "Error cleaning up local temp files from the distributed cache.", e)
-      }
+    else {
+      log.error(s"Cannot find task with ID $executionID to unregister.")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8e613014/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index d13db05..fdf41f0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -19,8 +19,8 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import com.google.common.collect.Lists;
+
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.io.network.util.TestPartitionProducer;
 import org.apache.flink.runtime.io.network.util.TestProducerSource;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+
 import org.junit.Test;
 
 import java.util.Collections;
@@ -93,7 +94,7 @@ public class LocalInputChannelTest {
 			partitionIds[i] = new ResultPartitionID();
 
 			final ResultPartition partition = new ResultPartition(
-					mock(Environment.class),
+					"Test Name",
 					jobId,
 					partitionIds[i],
 					ResultPartitionType.PIPELINED,
@@ -222,7 +223,7 @@ public class LocalInputChannelTest {
 			checkArgument(numberOfExpectedBuffersPerChannel >= 1);
 
 			this.inputGate = new SingleInputGate(
-					mock(Environment.class),
+					"Test Name",
 					new IntermediateDataSetID(),
 					subpartitionIndex,
 					numberOfInputChannels);

http://git-wip-us.apache.org/repos/asf/flink/blob/8e613014/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index be66aeb..9a7ffe5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -22,8 +22,6 @@ import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionLocation;
 import org.apache.flink.runtime.event.task.TaskEvent;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.execution.RuntimeEnvironment;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
@@ -61,7 +59,7 @@ public class SingleInputGateTest {
 	public void testBasicGetNextLogic() throws Exception {
 		// Setup
 		final SingleInputGate inputGate = new SingleInputGate(
-				mock(Environment.class), new IntermediateDataSetID(), 0, 2);
+				"Test Task Name", new IntermediateDataSetID(), 0, 2);
 
 		final TestInputChannel[] inputChannels = new TestInputChannel[]{
 				new TestInputChannel(inputGate, 0),
@@ -107,7 +105,7 @@ public class SingleInputGateTest {
 		// Setup reader with one local and one unknown input channel
 		final IntermediateDataSetID resultId = new IntermediateDataSetID();
 
-		final SingleInputGate inputGate = new SingleInputGate(mock(Environment.class), resultId, 0, 2);
+		final SingleInputGate inputGate = new SingleInputGate("Test Task Name", resultId, 0, 2);
 		final BufferPool bufferPool = mock(BufferPool.class);
 		when(bufferPool.getNumberOfRequiredMemorySegments()).thenReturn(2);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8e613014/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
index efc02de..2dafaa2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
-import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
@@ -30,7 +29,6 @@ import java.util.List;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkElementIndex;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 
 /**
@@ -50,7 +48,7 @@ public class TestSingleInputGate {
 		checkArgument(numberOfInputChannels >= 1);
 
 		this.inputGate = spy(new SingleInputGate(
-				mock(Environment.class), new IntermediateDataSetID(), 0, numberOfInputChannels));
+				"Test Task Name", new IntermediateDataSetID(), 0, numberOfInputChannels));
 
 		this.inputChannels = new TestInputChannel[numberOfInputChannels];
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8e613014/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
index 90132ff..050f43a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
@@ -18,14 +18,13 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
-import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
 
 public class UnionInputGateTest {
 
@@ -39,9 +38,9 @@ public class UnionInputGateTest {
 	@Test(timeout = 120 * 1000)
 	public void testBasicGetNextLogic() throws Exception {
 		// Setup
-		final Environment env = mock(Environment.class);
-		final SingleInputGate ig1 = new SingleInputGate(env, new IntermediateDataSetID(), 0, 3);
-		final SingleInputGate ig2 = new SingleInputGate(env, new IntermediateDataSetID(), 0, 5);
+		final String testTaskName = "Test Task";
+		final SingleInputGate ig1 = new SingleInputGate(testTaskName, new IntermediateDataSetID(), 0, 3);
+		final SingleInputGate ig2 = new SingleInputGate(testTaskName, new IntermediateDataSetID(), 0, 5);
 
 		final UnionInputGate union = new UnionInputGate(new SingleInputGate[]{ig1, ig2});
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8e613014/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 16cd66e..735f67e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.operators.testutils;
 
+import akka.actor.ActorRef;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
@@ -261,4 +262,9 @@ public class MockEnvironment implements Environment {
 	public void reportAccumulators(Map<String, Accumulator<?, ?>> accumulators) {
 		// discard, this is only for testing
 	}
+
+	@Override
+	public ActorRef getJobManager() {
+		return ActorRef.noSender();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8e613014/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/ForwardingActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/ForwardingActor.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/ForwardingActor.java
new file mode 100644
index 0000000..70e6f22
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/ForwardingActor.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import akka.actor.UntypedActor;
+
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Actor for testing that simply puts all its messages into a 
+ * blocking queue.
+ */
+class ForwardingActor extends UntypedActor {
+
+	private final BlockingQueue<Object> queue;
+	
+	public ForwardingActor(BlockingQueue<Object> queue) {
+		this.queue = queue;
+	}
+
+	@Override
+	public void onReceive(Object message) {
+		queue.add(message);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8e613014/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateTest.java
index 9d10591..f4c7a57 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateTest.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.taskmanager;
 import static org.junit.Assert.*;
 
 import java.io.IOException;
+import java.io.PrintStream;
+import java.io.PrintWriter;
 
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -84,4 +86,35 @@ public class TaskExecutionStateTest {
 			fail(e.getMessage());
 		}
 	}
+	
+	@Test
+	public void hanleNonSerializableException() {
+		try {
+			@SuppressWarnings({"ThrowableInstanceNeverThrown", "serial"})
+			Exception hostile = new Exception() {
+				// should be non serializable, because it contains the outer class reference
+				
+				@Override
+				public String getMessage() {
+					throw new RuntimeException("Cannot get Message");
+				}
+
+				@Override
+				public void printStackTrace(PrintStream s) {
+					throw new RuntimeException("Cannot print");
+				}
+
+				@Override
+				public void printStackTrace(PrintWriter s) {
+					throw new RuntimeException("Cannot print");
+				}
+			};
+
+			new TaskExecutionState(new JobID(), new ExecutionAttemptID(), ExecutionState.FAILED, hostile);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8e613014/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 056ed37..d84cb37 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -29,6 +29,7 @@ import akka.testkit.JavaTestKit;
 import akka.util.Timeout;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
@@ -56,11 +57,14 @@ import org.apache.flink.runtime.messages.TaskMessages.SubmitTask;
 import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
 import org.apache.flink.runtime.testingUtils.TestingTaskManager;
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
+
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import scala.Option;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
@@ -75,42 +79,61 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
 
 @SuppressWarnings("serial")
 public class TaskManagerTest {
 
-	private static ActorSystem system;
-
-	private static Timeout timeout = new Timeout(1, TimeUnit.MINUTES);
+	private static final Logger LOG = LoggerFactory.getLogger(TaskManagerTest.class);
+	
+	private static final Timeout timeout = new Timeout(1, TimeUnit.MINUTES);
 
-	private final FiniteDuration d = new FiniteDuration(20, TimeUnit.SECONDS);
+	private static final FiniteDuration d = new FiniteDuration(20, TimeUnit.SECONDS);
 
+	private static ActorSystem system;
+	
 	@BeforeClass
 	public static void setup() {
-		system = ActorSystem.create("TestActorSystem", TestingUtils.testConfig());
+		system = AkkaUtils.createLocalActorSystem(new Configuration());
 	}
 
 	@AfterClass
 	public static void teardown() {
 		JavaTestKit.shutdownActorSystem(system);
-		system = null;
 	}
 	
 	@Test
-	public void testSetupTaskManager() {
+	public void testSubmitAndExecuteTask() {
+		
+		LOG.info(	"--------------------------------------------------------------------\n" + 
+					"     Starting testSubmitAndExecuteTask() \n" + 
+					"--------------------------------------------------------------------");
+		
+		
 		new JavaTestKit(system){{
-			ActorRef jobManager = null;
+
 			ActorRef taskManager = null;
+			
 			try {
-				jobManager = system.actorOf(Props.create(SimpleJobManager.class));
-
-				taskManager = createTaskManager(jobManager);
+				taskManager = createTaskManager(getTestActor(), false);
+				final ActorRef tmClosure = taskManager;
+				
+				// handle the registration
+				new Within(d) {
+					@Override
+					protected void run() {
+						expectMsgClass(RegistrationMessages.RegisterTaskManager.class);
+						
+						final InstanceID iid = new InstanceID();
+						assertEquals(tmClosure, getLastSender());
+						tmClosure.tell(new RegistrationMessages.AcknowledgeRegistration(
+								getTestActor(), iid, 12345), getTestActor());
+					}
+				};
 
-				JobID jid = new JobID();
-				JobVertexID vid = new JobVertexID();
+				final JobID jid = new JobID();
+				final JobVertexID vid = new JobVertexID();
 				final ExecutionAttemptID eid = new ExecutionAttemptID();
 
 				final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(jid, vid, eid, "TestTask", 2, 7,
@@ -119,13 +142,54 @@ public class TaskManagerTest {
 						Collections.<InputGateDeploymentDescriptor>emptyList(),
 					new ArrayList<BlobKey>(), 0);
 
-				final ActorRef tmClosure = taskManager;
+				
 				new Within(d) {
 
 					@Override
 					protected void run() {
 						tmClosure.tell(new SubmitTask(tdd), getRef());
-						expectMsgEquals(Messages.getAcknowledge());
+						
+						// TaskManager should acknowledge the submission
+						// heartbeats may be interleaved
+						long deadline = System.currentTimeMillis() + 10000;
+						do {
+							Object message = receiveOne(d);
+							if (message == Messages.getAcknowledge()) {
+								break;
+							}
+						} while (System.currentTimeMillis() < deadline);
+
+						// task should have switched to running
+						Object toRunning = new TaskMessages.UpdateTaskExecutionState(
+								new TaskExecutionState(jid, eid, ExecutionState.RUNNING));
+
+						// task should have switched to finished
+						Object toFinished = new TaskMessages.UpdateTaskExecutionState(
+								new TaskExecutionState(jid, eid, ExecutionState.FINISHED));
+						
+						deadline = System.currentTimeMillis() + 10000;
+						do {
+							Object message = receiveOne(d);
+							if (message.equals(toRunning)) {
+								break;
+							}
+							else if (!(message instanceof TaskManagerMessages.Heartbeat)) {
+								fail("Unexpected message: " + message);
+							}
+						} while (System.currentTimeMillis() < deadline);
+
+						deadline = System.currentTimeMillis() + 10000;
+						do {
+							Object message = receiveOne(d);
+							if (message.equals(toFinished)) {
+								break;
+							}
+							else if (!(message instanceof TaskManagerMessages.Heartbeat)) {
+								fail("Unexpected message: " + message);
+							}
+						} while (System.currentTimeMillis() < deadline);
+						
+						
 					}
 				};
 			}
@@ -138,22 +202,24 @@ public class TaskManagerTest {
 				if (taskManager != null) {
 					taskManager.tell(Kill.getInstance(), ActorRef.noSender());
 				}
-				if (jobManager != null) {
-					jobManager.tell(Kill.getInstance(), ActorRef.noSender());
-				}
 			}
 		}};
 	}
 	
 	@Test
 	public void testJobSubmissionAndCanceling() {
+
+		LOG.info(	"--------------------------------------------------------------------\n" +
+					"     Starting testJobSubmissionAndCanceling() \n" +
+					"--------------------------------------------------------------------");
+		
 		new JavaTestKit(system){{
 
 			ActorRef jobManager = null;
 			ActorRef taskManager = null;
 			try {
 				jobManager = system.actorOf(Props.create(SimpleJobManager.class));
-				taskManager = createTaskManager(jobManager);
+				taskManager = createTaskManager(jobManager, true);
 
 				final JobID jid1 = new JobID();
 				final JobID jid2 = new JobID();
@@ -274,6 +340,11 @@ public class TaskManagerTest {
 	
 	@Test
 	public void testGateChannelEdgeMismatch() {
+
+		LOG.info(	"--------------------------------------------------------------------\n" +
+					"     Starting testGateChannelEdgeMismatch() \n" +
+					"--------------------------------------------------------------------");
+		
 		new JavaTestKit(system){{
 
 			ActorRef jobManager = null;
@@ -281,7 +352,7 @@ public class TaskManagerTest {
 			try {
 				jobManager = system.actorOf(Props.create(SimpleJobManager.class));
 
-				taskManager = createTaskManager(jobManager);
+				taskManager = createTaskManager(jobManager, true);
 				final ActorRef tm = taskManager;
 
 				final JobID jid = new JobID();
@@ -353,6 +424,11 @@ public class TaskManagerTest {
 	
 	@Test
 	public void testRunJobWithForwardChannel() {
+
+		LOG.info(	"--------------------------------------------------------------------\n" +
+					"     Starting testRunJobWithForwardChannel() \n" +
+					"--------------------------------------------------------------------");
+		
 		new JavaTestKit(system){{
 
 			ActorRef jobManager = null;
@@ -368,7 +444,7 @@ public class TaskManagerTest {
 
 				jobManager = system.actorOf(Props.create(new SimpleLookupJobManagerCreator()));
 
-				taskManager = createTaskManager(jobManager);
+				taskManager = createTaskManager(jobManager, true);
 				final ActorRef tm = taskManager;
 
 				IntermediateResultPartitionID partitionId = new IntermediateResultPartitionID();
@@ -470,6 +546,10 @@ public class TaskManagerTest {
 	@Test
 	public void testCancellingDependentAndStateUpdateFails() {
 
+		LOG.info(	"--------------------------------------------------------------------\n" +
+					"     Starting testCancellingDependentAndStateUpdateFails() \n" +
+					"--------------------------------------------------------------------");
+
 		// this tests creates two tasks. the sender sends data, and fails to send the
 		// state update back to the job manager
 		// the second one blocks to be canceled
@@ -491,7 +571,7 @@ public class TaskManagerTest {
 								new SimpleLookupFailingUpdateJobManagerCreator(eid2)
 						)
 				);
-				taskManager = createTaskManager(jobManager);
+				taskManager = createTaskManager(jobManager, true);
 				final ActorRef tm = taskManager;
 
 				IntermediateResultPartitionID partitionId = new IntermediateResultPartitionID();
@@ -676,7 +756,7 @@ public class TaskManagerTest {
 		}
 	}
 
-	public static ActorRef createTaskManager(ActorRef jobManager) {
+	public static ActorRef createTaskManager(ActorRef jobManager, boolean waitForRegistration) {
 		ActorRef taskManager = null;
 		try {
 			Configuration cfg = new Configuration();
@@ -695,16 +775,18 @@ public class TaskManagerTest {
 			fail("Could not create test TaskManager: " + e.getMessage());
 		}
 
-		Future<Object> response = Patterns.ask(taskManager, 
-				TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(), timeout);
-
-		try {
-			FiniteDuration d = new FiniteDuration(100, TimeUnit.SECONDS);
-			Await.ready(response, d);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail("Exception while waiting for the task manager registration: " + e.getMessage());
+		if (waitForRegistration) {
+			Future<Object> response = Patterns.ask(taskManager, 
+					TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(), timeout);
+	
+			try {
+				FiniteDuration d = new FiniteDuration(100, TimeUnit.SECONDS);
+				Await.ready(response, d);
+			}
+			catch (Exception e) {
+				e.printStackTrace();
+				fail("Exception while waiting for the task manager registration: " + e.getMessage());
+			}
 		}
 
 		return taskManager;

http://git-wip-us.apache.org/repos/asf/flink/blob/8e613014/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 3a8fcd8..4492372 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -19,72 +19,277 @@
 package org.apache.flink.runtime.taskmanager;
 
 import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Kill;
+import akka.actor.Props;
+
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.execution.RuntimeEnvironment;
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.MockNetworkEnvironment;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.runtime.messages.TaskMessages;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
+import scala.concurrent.duration.FiniteDuration;
 
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Mockito.doNothing;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+/**
+ * Tests for the Task, which make sure that correct state transitions happen,
+ * and failures are correctly handled.
+ *
+ * All tests here have a set of mock actors for TaskManager, JobManager, and
+ * execution listener, which simply put the messages in a queue to be picked
+ * up by the test and validated.
+ */
 public class TaskTest {
+	
+	private static ActorSystem actorSystem;
+	
+	private static OneShotLatch awaitLatch;
+	private static OneShotLatch triggerLatch;
+	
+	private ActorRef taskManagerMock;
+	private ActorRef jobManagerMock;
+	private ActorRef listenerActor;
+
+	private BlockingQueue<Object> taskManagerMessages;
+	private BlockingQueue<Object> jobManagerMessages;
+	private BlockingQueue<Object> listenerMessages;
+	
+	// ------------------------------------------------------------------------
+	//  Init & Shutdown
+	// ------------------------------------------------------------------------
 
+	@BeforeClass
+	public static void startActorSystem() {
+		actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
+	}
+	
+	@AfterClass
+	public static void shutdown() {
+		actorSystem.shutdown();
+		actorSystem.awaitTermination();
+	}
+	
+	@Before
+	public void createQueuesAndActors() {
+		taskManagerMessages = new LinkedBlockingQueue<Object>();
+		jobManagerMessages = new LinkedBlockingQueue<Object>();
+		listenerMessages = new LinkedBlockingQueue<Object>();
+		taskManagerMock = actorSystem.actorOf(Props.create(ForwardingActor.class, taskManagerMessages));
+		jobManagerMock = actorSystem.actorOf(Props.create(ForwardingActor.class, jobManagerMessages));
+		listenerActor = actorSystem.actorOf(Props.create(ForwardingActor.class, listenerMessages));
+		
+		awaitLatch = new OneShotLatch();
+		triggerLatch = new OneShotLatch();
+	}
+
+	@After
+	public void clearActorsAndMessages() {
+		jobManagerMessages = null;
+		taskManagerMessages = null;
+		listenerMessages = null;
+		taskManagerMock.tell(Kill.getInstance(), ActorRef.noSender());
+		jobManagerMock.tell(Kill.getInstance(), ActorRef.noSender());
+		listenerActor.tell(Kill.getInstance(), ActorRef.noSender());
+	}
+
+	// ------------------------------------------------------------------------
+	//  Tests
+	// ------------------------------------------------------------------------
+	
 	@Test
-	public void testTaskStates() {
+	public void testRegularExecution() {
 		try {
-			final JobID jid = new JobID();
-			final JobVertexID vid = new JobVertexID();
-			final ExecutionAttemptID eid = new ExecutionAttemptID();
+			Task task = createTask(TestInvokableCorrect.class);
 			
-			final RuntimeEnvironment env = mock(RuntimeEnvironment.class);
+			// task should be new and perfect
+			assertEquals(ExecutionState.CREATED, task.getExecutionState());
+			assertFalse(task.isCanceledOrFailed());
+			assertNull(task.getFailureCause());
 			
-			Task task = spy(new Task(jid, vid, 2, 7, eid, "TestTask", ActorRef.noSender()));
-			doNothing().when(task).unregisterTask();
-			task.setEnvironment(env);
+			task.registerExecutionListener(listenerActor);
 			
-			assertEquals(ExecutionState.DEPLOYING, task.getExecutionState());
+			// go into the run method. we should switch to DEPLOYING, RUNNING, then
+			// FINISHED, and all should be good
+			task.run();
 			
-			// cancel
-			task.cancelExecution();
-			assertEquals(ExecutionState.CANCELED, task.getExecutionState());
+			// verify final state
+			assertEquals(ExecutionState.FINISHED, task.getExecutionState());
+			assertFalse(task.isCanceledOrFailed());
+			assertNull(task.getFailureCause());
 			
-			// cannot go into running or finished state
+			// verify listener messages
+			validateListenerMessage(ExecutionState.RUNNING, task, false);
+			validateListenerMessage(ExecutionState.FINISHED, task, false);
 			
-			assertFalse(task.startExecution());
-			assertEquals(ExecutionState.CANCELED, task.getExecutionState());
+			// make sure that the TaskManager received an message to unregister the task
+			validateTaskManagerStateChange(ExecutionState.RUNNING, task, false);
+			validateUnregisterTask(task.getExecutionId());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testCancelRightAway() {
+		try {
+			Task task = createTask(TestInvokableCorrect.class);
+			task.cancelExecution();
+
+			assertEquals(ExecutionState.CANCELING, task.getExecutionState());
 			
-			assertFalse(task.markAsFinished());
+			task.run();
+
+			// verify final state
 			assertEquals(ExecutionState.CANCELED, task.getExecutionState());
+			validateUnregisterTask(task.getExecutionId());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testFailExternallyRightAway() {
+		try {
+			Task task = createTask(TestInvokableCorrect.class);
+			task.failExternally(new Exception("fail externally"));
+
+			assertEquals(ExecutionState.FAILED, task.getExecutionState());
+
+			task.run();
+
+			// verify final state
+			assertEquals(ExecutionState.FAILED, task.getExecutionState());
+			validateUnregisterTask(task.getExecutionId());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testLibraryCacheRegistrationFailed() {
+		try {
+			Task task = createTask(TestInvokableCorrect.class, mock(LibraryCacheManager.class));
+
+			// task should be new and perfect
+			assertEquals(ExecutionState.CREATED, task.getExecutionState());
+			assertFalse(task.isCanceledOrFailed());
+			assertNull(task.getFailureCause());
+
+			task.registerExecutionListener(listenerActor);
+
+			// should fail
+			task.run();
+
+			// verify final state
+			assertEquals(ExecutionState.FAILED, task.getExecutionState());
+			assertTrue(task.isCanceledOrFailed());
+			assertNotNull(task.getFailureCause());
+			assertTrue(task.getFailureCause().getMessage().contains("classloader"));
+
+			// verify listener messages
+			validateListenerMessage(ExecutionState.FAILED, task, true);
+
+			// make sure that the TaskManager received an message to unregister the task
+			validateUnregisterTask(task.getExecutionId());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testExecutionFailsInNetworkRegistration() {
+		try {
+			// mock a working library cache
+			LibraryCacheManager libCache = mock(LibraryCacheManager.class);
+			when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
+			
+			// mock a network manager that rejects registration
+			ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
+			ResultPartitionConsumableNotifier consumableNotifier = mock(ResultPartitionConsumableNotifier.class);
+			NetworkEnvironment network = mock(NetworkEnvironment.class);
+			when(network.getPartitionManager()).thenReturn(partitionManager);
+			when(network.getPartitionConsumableNotifier()).thenReturn(consumableNotifier);
+			when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
+			doThrow(new RuntimeException("buffers")).when(network).registerTask(any(Task.class));
+			
+			Task task = createTask(TestInvokableCorrect.class, libCache, network);
+
+			task.registerExecutionListener(listenerActor);
+
+			task.run();
+
+			assertEquals(ExecutionState.FAILED, task.getExecutionState());
+			assertTrue(task.isCanceledOrFailed());
+			assertTrue(task.getFailureCause().getMessage().contains("buffers"));
 			
-			task.markFailed(new Exception("test"));
-			assertTrue(ExecutionState.CANCELED == task.getExecutionState());
+			validateUnregisterTask(task.getExecutionId());
+			validateListenerMessage(ExecutionState.FAILED, task, true);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
 
-			verify(task).unregisterTask();
+	@Test
+	public void testInvokableInstantiationFailed() {
+		try {
+			Task task = createTask(InvokableNonInstantiable.class);
+			task.registerExecutionListener(listenerActor);
+
+			task.run();
+
+			assertEquals(ExecutionState.FAILED, task.getExecutionState());
+			assertTrue(task.isCanceledOrFailed());
+			assertTrue(task.getFailureCause().getMessage().contains("instantiate"));
+
+			validateUnregisterTask(task.getExecutionId());
+			validateListenerMessage(ExecutionState.FAILED, task, true);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -93,48 +298,19 @@ public class TaskTest {
 	}
 	
 	@Test
-	public void testTaskStartFinish() {
+	public void testExecutionFailsInRegisterInputOutput() {
 		try {
-			final JobID jid = new JobID();
-			final JobVertexID vid = new JobVertexID();
-			final ExecutionAttemptID eid = new ExecutionAttemptID();
-			
-			final Task task = spy(new Task(jid, vid, 2, 7, eid, "TestTask", ActorRef.noSender()));
-			doNothing().when(task).unregisterTask();
-			
-			final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
-			
-			Thread operation = new Thread() {
-				@Override
-				public void run() {
-					try {
-						assertTrue(task.markAsFinished());
-					}
-					catch (Throwable t) {
-						error.set(t);
-					}
-				}
-			};
-			
-			final RuntimeEnvironment env = mock(RuntimeEnvironment.class);
-			when(env.getExecutingThread()).thenReturn(operation);
-			
-			assertEquals(ExecutionState.DEPLOYING, task.getExecutionState());
-			
-			// start the execution
-			task.setEnvironment(env);
-			task.startExecution();
-			
-			// wait for the execution to be finished
-			operation.join();
-			
-			if (error.get() != null) {
-				ExceptionUtils.rethrow(error.get());
-			}
-			
-			assertEquals(ExecutionState.FINISHED, task.getExecutionState());
+			Task task = createTask(InvokableWithExceptionInRegisterInOut.class);
+			task.registerExecutionListener(listenerActor);
 
-			verify(task).unregisterTask();
+			task.run();
+
+			assertEquals(ExecutionState.FAILED, task.getExecutionState());
+			assertTrue(task.isCanceledOrFailed());
+			assertTrue(task.getFailureCause().getMessage().contains("registerInputOutput"));
+
+			validateUnregisterTask(task.getExecutionId());
+			validateListenerMessage(ExecutionState.FAILED, task, true);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -143,48 +319,22 @@ public class TaskTest {
 	}
 	
 	@Test
-	public void testTaskFailesInRunning() {
+	public void testExecutionFailsInInvoke() {
 		try {
-			final JobID jid = new JobID();
-			final JobVertexID vid = new JobVertexID();
-			final ExecutionAttemptID eid = new ExecutionAttemptID();
-			
-			final Task task = spy(new Task(jid, vid, 2, 7, eid, "TestTask", ActorRef.noSender()));
-			doNothing().when(task).unregisterTask();
-
-			final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
-			
-			Thread operation = new Thread() {
-				@Override
-				public void run() {
-					try {
-						task.markFailed(new Exception("test exception message"));
-					}
-					catch (Throwable t) {
-						error.set(t);
-					}
-				}
-			};
-			
-			final RuntimeEnvironment env = mock(RuntimeEnvironment.class);
-			when(env.getExecutingThread()).thenReturn(operation);
+			Task task = createTask(InvokableWithExceptionInInvoke.class);
+			task.registerExecutionListener(listenerActor);
 			
-			assertEquals(ExecutionState.DEPLOYING, task.getExecutionState());
-			
-			// start the execution
-			task.setEnvironment(env);
-			task.startExecution();
-			
-			// wait for the execution to be finished
-			operation.join();
-			
-			if (error.get() != null) {
-				ExceptionUtils.rethrow(error.get());
-			}
-			
-			// make sure the final state is correct and the task manager knows the changes
+			task.run();
+
 			assertEquals(ExecutionState.FAILED, task.getExecutionState());
-			verify(task).unregisterTask();
+			assertTrue(task.isCanceledOrFailed());
+			assertTrue(task.getFailureCause().getMessage().contains("test"));
+
+			validateTaskManagerStateChange(ExecutionState.RUNNING, task, false);
+			validateUnregisterTask(task.getExecutionId());
+			
+			validateListenerMessage(ExecutionState.RUNNING, task, false);
+			validateListenerMessage(ExecutionState.FAILED, task, true);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -193,59 +343,180 @@ public class TaskTest {
 	}
 	
 	@Test
-	public void testTaskCanceledInRunning() {
+	public void testCancelDuringRegisterInputOutput() {
 		try {
-			final JobID jid = new JobID();
-			final JobVertexID vid = new JobVertexID();
-			final ExecutionAttemptID eid = new ExecutionAttemptID();
-
-			final Task task = spy(new Task(jid, vid, 2, 7, eid, "TestTask", ActorRef.noSender()));
-			doNothing().when(task).unregisterTask();
-			
-			final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
-			
-			// latches to create a deterministic order of events
-			final OneShotLatch toRunning = new OneShotLatch();
-			final OneShotLatch afterCanceling = new OneShotLatch();
-			
-			Thread operation = new Thread() {
-				@Override
-				public void run() {
-					try {
-						toRunning.trigger();
-						afterCanceling.await();
-						assertFalse(task.markAsFinished());
-						task.cancelingDone();
-					}
-					catch (Throwable t) {
-						error.set(t);
-					}
-				}
-			};
+			Task task = createTask(InvokableBlockingInRegisterInOut.class);
+			task.registerExecutionListener(listenerActor);
+
+			// run the task asynchronous
+			task.startTaskThread();
 			
-			final RuntimeEnvironment env = mock(RuntimeEnvironment.class);
-			when(env.getExecutingThread()).thenReturn(operation);
+			// wait till the task is in regInOut
+			awaitLatch.await();
 			
-			assertEquals(ExecutionState.DEPLOYING, task.getExecutionState());
+			task.cancelExecution();
+			assertEquals(ExecutionState.CANCELING, task.getExecutionState());
+			triggerLatch.trigger();
 			
-			// start the execution
-			task.setEnvironment(env);
-			task.startExecution();
+			task.getExecutingThread().join();
+
+			assertEquals(ExecutionState.CANCELED, task.getExecutionState());
+			assertTrue(task.isCanceledOrFailed());
+			assertNull(task.getFailureCause());
 			
-			toRunning.await();
+			validateUnregisterTask(task.getExecutionId());
+			validateListenerMessage(ExecutionState.CANCELING, task, false);
+			validateListenerMessage(ExecutionState.CANCELED, task, false);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testFailDuringRegisterInputOutput() {
+		try {
+			Task task = createTask(InvokableBlockingInRegisterInOut.class);
+			task.registerExecutionListener(listenerActor);
+
+			// run the task asynchronous
+			task.startTaskThread();
+
+			// wait till the task is in regInOut
+			awaitLatch.await();
+
+			task.failExternally(new Exception("test"));
+			assertEquals(ExecutionState.FAILED, task.getExecutionState());
+			triggerLatch.trigger();
+
+			task.getExecutingThread().join();
+
+			assertEquals(ExecutionState.FAILED, task.getExecutionState());
+			assertTrue(task.isCanceledOrFailed());
+			assertTrue(task.getFailureCause().getMessage().contains("test"));
+
+			validateUnregisterTask(task.getExecutionId());
+			validateListenerMessage(ExecutionState.FAILED, task, true);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testCancelDuringInvoke() {
+		try {
+			Task task = createTask(InvokableBlockingInInvoke.class);
+			task.registerExecutionListener(listenerActor);
+
+			// run the task asynchronous
+			task.startTaskThread();
+
+			// wait till the task is in invoke
+			awaitLatch.await();
+
 			task.cancelExecution();
-			afterCanceling.trigger();
+			assertTrue(task.getExecutionState() == ExecutionState.CANCELING ||
+					task.getExecutionState() == ExecutionState.CANCELED);
+
+			task.getExecutingThread().join();
+
+			assertEquals(ExecutionState.CANCELED, task.getExecutionState());
+			assertTrue(task.isCanceledOrFailed());
+			assertNull(task.getFailureCause());
+
+			validateTaskManagerStateChange(ExecutionState.RUNNING, task, false);
+			validateUnregisterTask(task.getExecutionId());
 			
-			// wait for the execution to be finished
-			operation.join();
+			validateListenerMessage(ExecutionState.RUNNING, task, false);
+			validateListenerMessage(ExecutionState.CANCELING, task, false);
+			validateListenerMessage(ExecutionState.CANCELED, task, false);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testFailExternallyDuringInvoke() {
+		try {
+			Task task = createTask(InvokableBlockingInInvoke.class);
+			task.registerExecutionListener(listenerActor);
+
+			// run the task asynchronous
+			task.startTaskThread();
+
+			// wait till the task is in regInOut
+			awaitLatch.await();
+
+			task.failExternally(new Exception("test"));
+			assertTrue(task.getExecutionState() == ExecutionState.FAILED);
+
+			task.getExecutingThread().join();
+
+			assertEquals(ExecutionState.FAILED, task.getExecutionState());
+			assertTrue(task.isCanceledOrFailed());
+			assertTrue(task.getFailureCause().getMessage().contains("test"));
+
+			validateTaskManagerStateChange(ExecutionState.RUNNING, task, false);
+			validateUnregisterTask(task.getExecutionId());
+
+			validateListenerMessage(ExecutionState.RUNNING, task, false);
+			validateListenerMessage(ExecutionState.FAILED, task, true);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testCanceledAfterExecutionFailedInRegInOut() {
+		try {
+			Task task = createTask(InvokableWithExceptionInRegisterInOut.class);
+			task.registerExecutionListener(listenerActor);
+
+			task.run();
 			
-			if (error.get() != null) {
-				ExceptionUtils.rethrow(error.get());
-			}
+			// this should not overwrite the failure state
+			task.cancelExecution();
+
+			assertEquals(ExecutionState.FAILED, task.getExecutionState());
+			assertTrue(task.isCanceledOrFailed());
+			assertTrue(task.getFailureCause().getMessage().contains("registerInputOutput"));
+
+			validateUnregisterTask(task.getExecutionId());
+			validateListenerMessage(ExecutionState.FAILED, task, true);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testCanceledAfterExecutionFailedInInvoke() {
+		try {
+			Task task = createTask(InvokableWithExceptionInInvoke.class);
+			task.registerExecutionListener(listenerActor);
+
+			task.run();
+
+			// this should not overwrite the failure state
+			task.cancelExecution();
+
+			assertEquals(ExecutionState.FAILED, task.getExecutionState());
+			assertTrue(task.isCanceledOrFailed());
+			assertTrue(task.getFailureCause().getMessage().contains("test"));
+
+			validateTaskManagerStateChange(ExecutionState.RUNNING, task, false);
+			validateUnregisterTask(task.getExecutionId());
 			
-			// make sure the final state is correct and the task manager knows the changes
-			assertEquals(ExecutionState.CANCELED, task.getExecutionState());
-			verify(task).unregisterTask();
+			validateListenerMessage(ExecutionState.RUNNING, task, false);
+			validateListenerMessage(ExecutionState.FAILED, task, true);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -254,80 +525,206 @@ public class TaskTest {
 	}
 	
 	@Test
-	public void testTaskWithEnvironment() {
+	public void testExecutionFailesAfterCanceling() {
 		try {
-			final JobID jid = new JobID();
-			final JobVertexID vid = new JobVertexID();
-			final ExecutionAttemptID eid = new ExecutionAttemptID();
-			
-			TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(jid, vid, eid, "TestTask", 2, 7,
-					new Configuration(), new Configuration(), TestInvokableCorrect.class.getName(),
-					Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
-					Collections.<InputGateDeploymentDescriptor>emptyList(),
-					new ArrayList<BlobKey>(), 0);
-			
-			Task task = spy(new Task(jid, vid, 2, 7, eid, "TestTask", ActorRef.noSender()));
-			doNothing().when(task).unregisterTask();
+			Task task = createTask(InvokableWithExceptionOnTrigger.class);
+			task.registerExecutionListener(listenerActor);
+
+			// run the task asynchronous
+			task.startTaskThread();
+
+			// wait till the task is in invoke
+			awaitLatch.await();
+
+			task.cancelExecution();
+			assertEquals(ExecutionState.CANCELING, task.getExecutionState());
 			
-			RuntimeEnvironment env = new RuntimeEnvironment(mock(ActorRef.class), task, tdd, getClass().getClassLoader(),
-					mock(MemoryManager.class), mock(IOManager.class), mock(InputSplitProvider.class),
-					new BroadcastVariableManager(), MockNetworkEnvironment.getMock());
+			// this causes an exception
+			triggerLatch.trigger();
+
+			task.getExecutingThread().join();
 
-			task.setEnvironment(env);
+			// we should still be in state canceled
+			assertEquals(ExecutionState.CANCELED, task.getExecutionState());
+			assertTrue(task.isCanceledOrFailed());
+			assertNull(task.getFailureCause());
 			
-			assertEquals(ExecutionState.DEPLOYING, task.getExecutionState());
+			validateTaskManagerStateChange(ExecutionState.RUNNING, task, false);
+			validateUnregisterTask(task.getExecutionId());
+
+			validateListenerMessage(ExecutionState.RUNNING, task, false);
+			validateListenerMessage(ExecutionState.CANCELING, task, false);
+			validateListenerMessage(ExecutionState.CANCELED, task, false);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testExecutionFailesAfterTaskMarkedFailed() {
+		try {
+			Task task = createTask(InvokableWithExceptionOnTrigger.class);
+			task.registerExecutionListener(listenerActor);
+
+			// run the task asynchronous
+			task.startTaskThread();
+
+			// wait till the task is in invoke
+			awaitLatch.await();
+
+			task.failExternally(new Exception("external"));
+			assertEquals(ExecutionState.FAILED, task.getExecutionState());
+
+			// this causes an exception
+			triggerLatch.trigger();
+
+			task.getExecutingThread().join();
 			
-			task.startExecution();
-			task.getEnvironment().getExecutingThread().join();
+			assertEquals(ExecutionState.FAILED, task.getExecutionState());
+			assertTrue(task.isCanceledOrFailed());
+			assertTrue(task.getFailureCause().getMessage().contains("external"));
 			
-			assertEquals(ExecutionState.FINISHED, task.getExecutionState());
+			validateTaskManagerStateChange(ExecutionState.RUNNING, task, false);
+			validateUnregisterTask(task.getExecutionId());
 
-			verify(task).unregisterTask();
+			validateListenerMessage(ExecutionState.RUNNING, task, false);
+			validateListenerMessage(ExecutionState.FAILED, task, true);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
 	}
+
+	private Task createTask(Class<? extends AbstractInvokable> invokable) {
+		LibraryCacheManager libCache = mock(LibraryCacheManager.class);
+		when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
+		return createTask(invokable, libCache);
+	}
+
+	private Task createTask(Class<? extends AbstractInvokable> invokable,
+							LibraryCacheManager libCache) {
+
+		ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
+		ResultPartitionConsumableNotifier consumableNotifier = mock(ResultPartitionConsumableNotifier.class);
+		NetworkEnvironment network = mock(NetworkEnvironment.class);
+		when(network.getPartitionManager()).thenReturn(partitionManager);
+		when(network.getPartitionConsumableNotifier()).thenReturn(consumableNotifier);
+		when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
+		
+		return createTask(invokable, libCache, network);
+	}
 	
-	@Test
-	public void testTaskWithEnvironmentAndException() {
+	private Task createTask(Class<? extends AbstractInvokable> invokable,
+							LibraryCacheManager libCache,
+							NetworkEnvironment networkEnvironment) {
+		
+		TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(invokable);
+		
+		return new Task(tdd,
+						mock(MemoryManager.class),
+						mock(IOManager.class),
+						networkEnvironment,
+						mock(BroadcastVariableManager.class),
+						taskManagerMock, jobManagerMock,
+						new FiniteDuration(60, TimeUnit.SECONDS),
+						libCache,
+						mock(FileCache.class));
+	}
+
+	private TaskDeploymentDescriptor createTaskDeploymentDescriptor(Class<? extends AbstractInvokable> invokable) {
+		return new TaskDeploymentDescriptor(
+				new JobID(), new JobVertexID(), new ExecutionAttemptID(),
+				"Test Task", 0, 1,
+				new Configuration(), new Configuration(),
+				invokable.getName(),
+				Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+				Collections.<InputGateDeploymentDescriptor>emptyList(),
+				Collections.<BlobKey>emptyList(),
+				0);
+	}
+
+	// ------------------------------------------------------------------------
+	// Validation Methods
+	// ------------------------------------------------------------------------
+	
+	private void validateUnregisterTask(ExecutionAttemptID id) {
 		try {
-			final JobID jid = new JobID();
-			final JobVertexID vid = new JobVertexID();
-			final ExecutionAttemptID eid = new ExecutionAttemptID();
+			// we may have to wait for a bit to give the actors time to receive the message
+			// and put it into the queue
+			Object rawMessage = taskManagerMessages.poll(10, TimeUnit.SECONDS);
 			
-			TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(jid, vid, eid, "TestTask", 2, 7,
-					new Configuration(), new Configuration(), TestInvokableWithException.class.getName(),
-					Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
-					Collections.<InputGateDeploymentDescriptor>emptyList(),
-					new ArrayList<BlobKey>(), 0);
+			assertNotNull("There is no additional TaskManager message", rawMessage);
+			assertTrue("TaskManager message is not 'UnregisterTask'", rawMessage instanceof TaskMessages.TaskInFinalState);
 			
-			Task task = spy(new Task(jid, vid, 2, 7, eid, "TestTask", ActorRef.noSender()));
-			doNothing().when(task).unregisterTask();
-			
-			RuntimeEnvironment env = new RuntimeEnvironment(mock(ActorRef.class), task, tdd, getClass().getClassLoader(),
-					mock(MemoryManager.class), mock(IOManager.class), mock(InputSplitProvider.class),
-					new BroadcastVariableManager(), MockNetworkEnvironment.getMock());
+			TaskMessages.TaskInFinalState message = (TaskMessages.TaskInFinalState) rawMessage;
+			assertEquals(id, message.executionID());
+		}
+		catch (InterruptedException e) {
+			fail("interrupted");
+		}
+	}
 
-			task.setEnvironment(env);
+	private void validateTaskManagerStateChange(ExecutionState state, Task task, boolean hasError) {
+		try {
+			// we may have to wait for a bit to give the actors time to receive the message
+			// and put it into the queue
+			Object rawMessage = taskManagerMessages.poll(10, TimeUnit.SECONDS);
+
+			assertNotNull("There is no additional TaskManager message", rawMessage);
+			assertTrue("TaskManager message is not 'UpdateTaskExecutionState'", 
+					rawMessage instanceof TaskMessages.UpdateTaskExecutionState);
 			
-			assertEquals(ExecutionState.DEPLOYING, task.getExecutionState());
+			TaskMessages.UpdateTaskExecutionState message =
+					(TaskMessages.UpdateTaskExecutionState) rawMessage;
 			
-			task.startExecution();
-			task.getEnvironment().getExecutingThread().join();
+			TaskExecutionState taskState =  message.taskExecutionState();
+
+			assertEquals(task.getJobID(), taskState.getJobID());
+			assertEquals(task.getExecutionId(), taskState.getID());
+			assertEquals(state, taskState.getExecutionState());
+
+			if (hasError) {
+				assertNotNull(taskState.getError(getClass().getClassLoader()));
+			} else {
+				assertNull(taskState.getError(getClass().getClassLoader()));
+			}
+		}
+		catch (InterruptedException e) {
+			fail("interrupted");
+		}
+	}
+	
+	private void validateListenerMessage(ExecutionState state, Task task, boolean hasError) {
+		try {
+			// we may have to wait for a bit to give the actors time to receive the message
+			// and put it into the queue
+			TaskMessages.UpdateTaskExecutionState message = 
+					(TaskMessages.UpdateTaskExecutionState) listenerMessages.poll(10, TimeUnit.SECONDS);
+			assertNotNull("There is no additional listener message", message);
 			
-			assertEquals(ExecutionState.FAILED, task.getExecutionState());
+			TaskExecutionState taskState =  message.taskExecutionState();
 
-			verify(task).unregisterTask();
+			assertEquals(task.getJobID(), taskState.getJobID());
+			assertEquals(task.getExecutionId(), taskState.getID());
+			assertEquals(state, taskState.getExecutionState());
+			
+			if (hasError) {
+				assertNotNull(taskState.getError(getClass().getClassLoader()));
+			} else {
+				assertNull(taskState.getError(getClass().getClassLoader()));
+			}
 		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
+		catch (InterruptedException e) {
+			fail("interrupted");
 		}
 	}
 	
 	// --------------------------------------------------------------------------------------------
+	//  Mock invokable code
+	// --------------------------------------------------------------------------------------------
 	
 	public static final class TestInvokableCorrect extends AbstractInvokable {
 
@@ -336,16 +733,93 @@ public class TaskTest {
 
 		@Override
 		public void invoke() {}
+
+		@Override
+		public void cancel() throws Exception {
+			fail("This should not be called");
+		}
+	}
+
+	public static final class InvokableWithExceptionInRegisterInOut extends AbstractInvokable {
+
+		@Override
+		public void registerInputOutput() {
+			throw new RuntimeException("test");
+		}
+
+		@Override
+		public void invoke() {}
 	}
 	
-	public static final class TestInvokableWithException extends AbstractInvokable {
+	public static final class InvokableWithExceptionInInvoke extends AbstractInvokable {
+		
+		@Override
+		public void registerInputOutput() {}
+
+		@Override
+		public void invoke() throws Exception {
+			throw new Exception("test");
+		}
+	}
+
+	public static final class InvokableWithExceptionOnTrigger extends AbstractInvokable {
+
+		@Override
+		public void registerInputOutput() {}
+
+		@Override
+		public void invoke() {
+			awaitLatch.trigger();
+			
+			// make sure that the interrupt call does not
+			// grab us out of the lock early
+			while (true) {
+				try {
+					triggerLatch.await();
+					break;
+				}
+				catch (InterruptedException e) {
+					// fall through the loop
+				}
+			}
+			
+			throw new RuntimeException("test");
+		}
+	}
+
+	public static abstract class InvokableNonInstantiable extends AbstractInvokable {}
+
+	public static final class InvokableBlockingInRegisterInOut extends AbstractInvokable {
+
+		@Override
+		public void registerInputOutput() {
+			awaitLatch.trigger();
+			
+			try {
+				triggerLatch.await();
+			}
+			catch (InterruptedException e) {
+				throw new RuntimeException();
+			}
+		}
+
+		@Override
+		public void invoke() {}
+	}
+
+	public static final class InvokableBlockingInInvoke extends AbstractInvokable {
 
 		@Override
 		public void registerInputOutput() {}
 
 		@Override
 		public void invoke() throws Exception {
-			throw new Exception("test exception");
+			awaitLatch.trigger();
+			
+			// block forever
+			synchronized (this) {
+				wait();
+			}
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8e613014/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
index bb0c1f9..5318254 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
@@ -27,7 +27,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager
 import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager
 import org.apache.flink.runtime.messages.Messages.Disconnect
-import org.apache.flink.runtime.messages.TaskMessages.{UpdateTaskExecutionState, UnregisterTask}
+import org.apache.flink.runtime.messages.TaskMessages.{TaskInFinalState, UpdateTaskExecutionState}
 import org.apache.flink.runtime.taskmanager.{TaskManagerConfiguration, TaskManager}
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved
 import org.apache.flink.runtime.testingUtils.TestingMessages.DisableDisconnect
@@ -95,8 +95,8 @@ class TestingTaskManager(config: TaskManagerConfiguration,
           }
       }
       
-    case UnregisterTask(executionID) =>
-      super.receiveWithLogMessages(UnregisterTask(executionID))
+    case TaskInFinalState(executionID) =>
+      super.receiveWithLogMessages(TaskInFinalState(executionID))
       waitForRemoval.remove(executionID) match {
         case Some(actors) => for(actor <- actors) actor ! true
         case None =>


Mime
View raw message