flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [36/63] [abbrv] Finalize ExecutionGraph state machine and calls
Date Sun, 21 Sep 2014 02:13:00 GMT
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
new file mode 100644
index 0000000..d97abdb
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.*;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+
+import java.util.Arrays;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.runtime.instance.AllocatedSlot;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.protocols.TaskOperationProtocol;
+import org.junit.Test;
+
+public class ExecutionStateProgressTest {
+
+	@Test
+	public void testAccumulatedStateFinished() {
+		try {
+			final JobID jid = new JobID();
+			final JobVertexID vid = new JobVertexID();
+			
+			AbstractJobVertex ajv = new AbstractJobVertex("TestVertex", vid);
+			ajv.setParallelism(3);
+			ajv.setInvokableClass(mock(AbstractInvokable.class).getClass());
+			
+			ExecutionGraph graph = new ExecutionGraph(jid, "test job", new Configuration());
+			graph.attachJobGraph(Arrays.asList(ajv));
+			
+			setGraphStatus(graph, JobStatus.RUNNING);
+			
+			ExecutionJobVertex ejv = graph.getJobVertex(vid);
+			
+			LibraryCacheManager.register(jid, new String[0]);
+			
+			// mock resources and mock taskmanager
+			TaskOperationProtocol taskManager = getSimpleAcknowledgingTaskmanager();
+			for (ExecutionVertex ee : ejv.getTaskVertices()) {
+				AllocatedSlot slot = getInstance(taskManager).allocateSlot(jid);
+				ee.deployToSlot(slot);
+			}
+			
+			// finish all
+			for (ExecutionVertex ee : ejv.getTaskVertices()) {
+				ee.executionFinished();
+			}
+			
+			assertTrue(ejv.isInFinalState());
+			assertEquals(JobStatus.FINISHED, graph.getState());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index 299f7ba..ce1ab30 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -26,10 +26,10 @@ import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.times;
 
 import java.io.IOException;
-import java.util.NoSuchElementException;
 
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
-import org.apache.flink.runtime.execution.ExecutionState2;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobgraph.JobID;
@@ -37,9 +37,9 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.DefaultScheduler;
 import org.apache.flink.runtime.protocols.TaskOperationProtocol;
 import org.apache.flink.runtime.taskmanager.TaskOperationResult;
-import org.apache.flink.util.LogUtils;
-import org.apache.log4j.Level;
+
 import org.junit.Test;
+
 import org.mockito.Matchers;
 
 public class ExecutionVertexCancelTest {
@@ -54,13 +54,19 @@ public class ExecutionVertexCancelTest {
 			final JobVertexID jid = new JobVertexID();
 			final ExecutionJobVertex ejv = getJobVertexNotExecuting(jid);
 			
-			final ExecutionVertex2 vertex = new ExecutionVertex2(ejv, 0, new IntermediateResult[0]);
+			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0]);
 			
-			assertEquals(ExecutionState2.CREATED, vertex.getExecutionState());
+			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
 			
 			vertex.cancel();
 			
-			assertEquals(ExecutionState2.CANCELED, vertex.getExecutionState());
+			assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
+			
+			assertNull(vertex.getFailureCause());
+			
+			assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
+			assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
+			assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELED) > 0);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -74,14 +80,20 @@ public class ExecutionVertexCancelTest {
 			final JobVertexID jid = new JobVertexID();
 			final ExecutionJobVertex ejv = getJobVertexNotExecuting(jid);
 			
-			final ExecutionVertex2 vertex = new ExecutionVertex2(ejv, 0, new IntermediateResult[0]);
+			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0]);
 			
-			setVertexState(vertex, ExecutionState2.SCHEDULED);
-			assertEquals(ExecutionState2.SCHEDULED, vertex.getExecutionState());
+			setVertexState(vertex, ExecutionState.SCHEDULED);
+			assertEquals(ExecutionState.SCHEDULED, vertex.getExecutionState());
 			
 			vertex.cancel();
 			
-			assertEquals(ExecutionState2.CANCELED, vertex.getExecutionState());
+			assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
+			
+			assertNull(vertex.getFailureCause());
+			
+			assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
+			assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
+			assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELED) > 0);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -97,51 +109,63 @@ public class ExecutionVertexCancelTest {
 			
 			final ExecutionJobVertex ejv = getJobVertexExecutingTriggered(jid, actions);
 			
-			final ExecutionVertex2 vertex = new ExecutionVertex2(ejv, 0, new IntermediateResult[0]);
+			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0]);
+			final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId();
 			
-			setVertexState(vertex, ExecutionState2.SCHEDULED);
-			assertEquals(ExecutionState2.SCHEDULED, vertex.getExecutionState());
+			setVertexState(vertex, ExecutionState.SCHEDULED);
+			assertEquals(ExecutionState.SCHEDULED, vertex.getExecutionState());
 			
 			// task manager mock
 			TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class);
-			when(taskManager.submitTask(Matchers.any(TaskDeploymentDescriptor.class))).thenReturn(new TaskOperationResult(jid, 0, true));
-			when(taskManager.cancelTask(jid, 0)).thenReturn(new TaskOperationResult(jid, 0, true), new TaskOperationResult(jid, 0, false));
+			when(taskManager.submitTask(Matchers.any(TaskDeploymentDescriptor.class))).thenReturn(new TaskOperationResult(execId, true));
+			when(taskManager.cancelTask(execId)).thenReturn(new TaskOperationResult(execId, true), new TaskOperationResult(execId, false));
 			
 			Instance instance = getInstance(taskManager);
 			AllocatedSlot slot = instance.allocateSlot(new JobID());
 			
+			LibraryCacheManager.register(ejv.getJobId(), new String[0]);
 			vertex.deployToSlot(slot);
 			
-			assertEquals(ExecutionState2.DEPLOYING, vertex.getExecutionState());
+			assertEquals(ExecutionState.DEPLOYING, vertex.getExecutionState());
 			
 			vertex.cancel();
 			 
-			assertEquals(ExecutionState2.CANCELING, vertex.getExecutionState());
+			assertEquals(ExecutionState.CANCELING, vertex.getExecutionState());
 			
 			verify(taskManager, times(0)).submitTask(Matchers.any(TaskDeploymentDescriptor.class));
-			verify(taskManager, times(0)).cancelTask(jid, 0);
+			verify(taskManager, times(0)).cancelTask(execId);
 
 			// first action happens (deploy)
 			actions.triggerNextAction();
+			assertEquals(ExecutionState.CANCELING, vertex.getExecutionState());
 			verify(taskManager, times(1)).submitTask(Matchers.any(TaskDeploymentDescriptor.class));
 			
 			// the deploy call found itself in canceling after it returned and needs to send a cancel call
 			// the call did not yet execute, so it is still in canceling
-			assertEquals(ExecutionState2.CANCELING, vertex.getExecutionState());
+			assertEquals(ExecutionState.CANCELING, vertex.getExecutionState());
 			
 			// second action happens (cancel call from cancel function)
 			actions.triggerNextAction();
 			
+			// TaskManager reports back (canceling done)
+			vertex.getCurrentExecutionAttempt().cancelingComplete();
+			
 			// should properly set state to cancelled
-			assertEquals(ExecutionState2.CANCELED, vertex.getExecutionState());
+			assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
 			
 			// trigger the correction canceling call
 			actions.triggerNextAction();
-			assertEquals(ExecutionState2.CANCELED, vertex.getExecutionState());
+			assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
 			
-			verify(taskManager, times(2)).cancelTask(jid, 0);
+			verify(taskManager, times(2)).cancelTask(execId);
 			
 			assertTrue(slot.isReleased());
+			
+			assertNull(vertex.getFailureCause());
+			
+			assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
+			assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
+			assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELED) > 0);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -157,31 +181,33 @@ public class ExecutionVertexCancelTest {
 			
 			final ExecutionJobVertex ejv = getJobVertexExecutingTriggered(jid, actions);
 			
-			final ExecutionVertex2 vertex = new ExecutionVertex2(ejv, 0, new IntermediateResult[0]);
+			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0]);
+			final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId();
 			
-			setVertexState(vertex, ExecutionState2.SCHEDULED);
-			assertEquals(ExecutionState2.SCHEDULED, vertex.getExecutionState());
+			setVertexState(vertex, ExecutionState.SCHEDULED);
+			assertEquals(ExecutionState.SCHEDULED, vertex.getExecutionState());
 			
 			// task manager mock
 			TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class);
-			when(taskManager.submitTask(Matchers.any(TaskDeploymentDescriptor.class))).thenReturn(new TaskOperationResult(jid, 0, true));
+			when(taskManager.submitTask(Matchers.any(TaskDeploymentDescriptor.class))).thenReturn(new TaskOperationResult(execId, true));
 			
 			// first return NOT SUCCESS (task not found, cancel call overtook deploy call), then success (cancel call after deploy call)
-			when(taskManager.cancelTask(jid, 0)).thenReturn(new TaskOperationResult(jid, 0, false), new TaskOperationResult(jid, 0, true));
+			when(taskManager.cancelTask(execId)).thenReturn(new TaskOperationResult(execId, false), new TaskOperationResult(execId, true));
 			
 			Instance instance = getInstance(taskManager);
 			AllocatedSlot slot = instance.allocateSlot(new JobID());
 			
+			LibraryCacheManager.register(ejv.getJobId(), new String[0]);
 			vertex.deployToSlot(slot);
 			
-			assertEquals(ExecutionState2.DEPLOYING, vertex.getExecutionState());
+			assertEquals(ExecutionState.DEPLOYING, vertex.getExecutionState());
 			
 			vertex.cancel();
 			 
-			assertEquals(ExecutionState2.CANCELING, vertex.getExecutionState());
+			assertEquals(ExecutionState.CANCELING, vertex.getExecutionState());
 			
 			verify(taskManager, times(0)).submitTask(Matchers.any(TaskDeploymentDescriptor.class));
-			verify(taskManager, times(0)).cancelTask(jid, 0);
+			verify(taskManager, times(0)).cancelTask(execId);
 
 			// first action happens (deploy)
 			Runnable deployAction = actions.popNextAction();
@@ -191,7 +217,7 @@ public class ExecutionVertexCancelTest {
 			cancelAction.run();
 			
 			// did not find the task, not properly cancelled, stay in canceling
-			assertEquals(ExecutionState2.CANCELING, vertex.getExecutionState());
+			assertEquals(ExecutionState.CANCELING, vertex.getExecutionState());
 			
 			// deploy action next
 			deployAction.run();
@@ -200,15 +226,23 @@ public class ExecutionVertexCancelTest {
 			
 			// the deploy call found itself in canceling after it returned and needs to send a cancel call
 			// the call did not yet execute, so it is still in canceling
-			assertEquals(ExecutionState2.CANCELING, vertex.getExecutionState());
+			assertEquals(ExecutionState.CANCELING, vertex.getExecutionState());
 			
 			// trigger the correcting cancel call, should properly set state to cancelled
 			actions.triggerNextAction();
-			assertEquals(ExecutionState2.CANCELED, vertex.getExecutionState());
+			vertex.getCurrentExecutionAttempt().cancelingComplete();
 			
-			verify(taskManager, times(2)).cancelTask(jid, 0);
+			assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
+			
+			verify(taskManager, times(2)).cancelTask(execId);
 			
 			assertTrue(slot.isReleased());
+			
+			assertNull(vertex.getFailureCause());
+			
+			assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
+			assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
+			assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELED) > 0);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -222,26 +256,34 @@ public class ExecutionVertexCancelTest {
 			final JobVertexID jid = new JobVertexID();
 			final ExecutionJobVertex ejv = getJobVertexExecutingSynchronously(jid);
 			
-			final ExecutionVertex2 vertex = new ExecutionVertex2(ejv, 0, new IntermediateResult[0]);
+			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0]);
+			final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId();
 
 			final TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class);
-			when(taskManager.cancelTask(jid, 0)).thenReturn(new TaskOperationResult(jid, 0, true));
+			when(taskManager.cancelTask(execId)).thenReturn(new TaskOperationResult(execId, true));
 			
 			Instance instance = getInstance(taskManager);
 			AllocatedSlot slot = instance.allocateSlot(new JobID());
 
-			setVertexState(vertex, ExecutionState2.RUNNING);
+			setVertexState(vertex, ExecutionState.RUNNING);
 			setVertexResource(vertex, slot);
 			
-			assertEquals(ExecutionState2.RUNNING, vertex.getExecutionState());
+			assertEquals(ExecutionState.RUNNING, vertex.getExecutionState());
 			
 			vertex.cancel();
+			vertex.getCurrentExecutionAttempt().cancelingComplete(); // responce by task manager once actially canceled
 			
-			assertEquals(ExecutionState2.CANCELED, vertex.getExecutionState());
+			assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
 			
-			verify(taskManager).cancelTask(jid, 0);
+			verify(taskManager).cancelTask(execId);
 			
 			assertTrue(slot.isReleased());
+			
+			assertNull(vertex.getFailureCause());
+			
+			assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
+			assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
+			assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELED) > 0);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -253,40 +295,45 @@ public class ExecutionVertexCancelTest {
 	public void testRepeatedCancelFromRunning() {
 		try {
 			final JobVertexID jid = new JobVertexID();
-			final ActionQueue actions = new ActionQueue();
-			final ExecutionJobVertex ejv = getJobVertexExecutingTriggered(jid, actions);
+			final ExecutionJobVertex ejv = getJobVertexExecutingSynchronously(jid);
 			
-			final ExecutionVertex2 vertex = new ExecutionVertex2(ejv, 0, new IntermediateResult[0]);
+			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0]);
+			final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId();
 
 			final TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class);
-			when(taskManager.cancelTask(jid, 0)).thenReturn(new TaskOperationResult(jid, 0, true));
+			when(taskManager.cancelTask(execId)).thenReturn(new TaskOperationResult(execId, true));
 			
 			Instance instance = getInstance(taskManager);
 			AllocatedSlot slot = instance.allocateSlot(new JobID());
 
-			setVertexState(vertex, ExecutionState2.RUNNING);
+			setVertexState(vertex, ExecutionState.RUNNING);
 			setVertexResource(vertex, slot);
 			
-			assertEquals(ExecutionState2.RUNNING, vertex.getExecutionState());
+			assertEquals(ExecutionState.RUNNING, vertex.getExecutionState());
 			
 			vertex.cancel();
 			
-			assertEquals(ExecutionState2.CANCELING, vertex.getExecutionState());
+			assertEquals(ExecutionState.CANCELING, vertex.getExecutionState());
 			
 			vertex.cancel();
 			
-			assertEquals(ExecutionState2.CANCELING, vertex.getExecutionState());
+			assertEquals(ExecutionState.CANCELING, vertex.getExecutionState());
 			
-			actions.triggerNextAction();
+			// callback by TaskManager after canceling completes
+			vertex.getCurrentExecutionAttempt().cancelingComplete();
 			
-			assertEquals(ExecutionState2.CANCELED, vertex.getExecutionState());
+			assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
 			
-			try {
-				actions.triggerNextAction();
-				fail("Too many calls sent.");
-			} catch (NoSuchElementException e) {}
+			// check that we did not overdo our cancel calls
+			verify(taskManager, times(1)).cancelTask(execId);
 			
 			assertTrue(slot.isReleased());
+			
+			assertNull(vertex.getFailureCause());
+			
+			assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
+			assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
+			assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELED) > 0);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -302,22 +349,28 @@ public class ExecutionVertexCancelTest {
 			final JobVertexID jid = new JobVertexID();
 			final ExecutionJobVertex ejv = getJobVertexExecutingSynchronously(jid);
 			
-			final ExecutionVertex2 vertex = new ExecutionVertex2(ejv, 0, new IntermediateResult[0]);
+			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0]);
+			final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId();
 
 			final TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class);
-			when(taskManager.cancelTask(jid, 0)).thenReturn(new TaskOperationResult(jid, 0, false));
+			when(taskManager.cancelTask(execId)).thenReturn(new TaskOperationResult(execId, false));
 			
 			Instance instance = getInstance(taskManager);
 			AllocatedSlot slot = instance.allocateSlot(new JobID());
 
-			setVertexState(vertex, ExecutionState2.RUNNING);
+			setVertexState(vertex, ExecutionState.RUNNING);
 			setVertexResource(vertex, slot);
 			
-			assertEquals(ExecutionState2.RUNNING, vertex.getExecutionState());
+			assertEquals(ExecutionState.RUNNING, vertex.getExecutionState());
 			
 			vertex.cancel();
 			
-			assertEquals(ExecutionState2.CANCELING, vertex.getExecutionState());
+			assertEquals(ExecutionState.CANCELING, vertex.getExecutionState());
+			
+			assertNull(vertex.getFailureCause());
+			
+			assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
+			assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -327,31 +380,36 @@ public class ExecutionVertexCancelTest {
 	
 	@Test
 	public void testCancelCallFails() {
-		// this may happen when the task finished or failed while the call was in progress
-		LogUtils.initializeDefaultConsoleLogger(Level.OFF);
 		
 		try {
 			final JobVertexID jid = new JobVertexID();
 			final ExecutionJobVertex ejv = getJobVertexExecutingSynchronously(jid);
 			
-			final ExecutionVertex2 vertex = new ExecutionVertex2(ejv, 0, new IntermediateResult[0]);
+			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0]);
+			final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId();
 
 			final TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class);
-			when(taskManager.cancelTask(jid, 0)).thenThrow(new IOException("RPC call failed"));
+			when(taskManager.cancelTask(execId)).thenThrow(new IOException("RPC call failed"));
 			
 			Instance instance = getInstance(taskManager);
 			AllocatedSlot slot = instance.allocateSlot(new JobID());
 
-			setVertexState(vertex, ExecutionState2.RUNNING);
+			setVertexState(vertex, ExecutionState.RUNNING);
 			setVertexResource(vertex, slot);
 			
-			assertEquals(ExecutionState2.RUNNING, vertex.getExecutionState());
+			assertEquals(ExecutionState.RUNNING, vertex.getExecutionState());
 			
 			vertex.cancel();
 			
-			assertEquals(ExecutionState2.FAILED, vertex.getExecutionState());
+			assertEquals(ExecutionState.FAILED, vertex.getExecutionState());
 			
 			assertTrue(slot.isReleased());
+			
+			assertNotNull(vertex.getFailureCause());
+			
+			assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
+			assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
+			assertTrue(vertex.getStateTimestamp(ExecutionState.FAILED) > 0);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -369,19 +427,19 @@ public class ExecutionVertexCancelTest {
 			final JobVertexID jid = new JobVertexID();
 			final ExecutionJobVertex ejv = getJobVertexNotExecuting(jid);
 			
-			final ExecutionVertex2 vertex = new ExecutionVertex2(ejv, 0, new IntermediateResult[0]);
-			setVertexState(vertex, ExecutionState2.CANCELED);
+			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0]);
+			setVertexState(vertex, ExecutionState.CANCELED);
 			
-			assertEquals(ExecutionState2.CANCELED, vertex.getExecutionState());
+			assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
 			
 			// 1)
 			// scheduling after being created should be tolerated (no exception) because
 			// it can occur as the result of races
 			{
 				DefaultScheduler scheduler = mock(DefaultScheduler.class);
-				vertex.scheduleForExecution(scheduler);
+				vertex.scheduleForExecution(scheduler, false);
 				
-				assertEquals(ExecutionState2.CANCELED, vertex.getExecutionState());
+				assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
 			}
 			
 			// 2)
@@ -396,7 +454,7 @@ public class ExecutionVertexCancelTest {
 				fail("Method should throw an exception");
 			}
 			catch (IllegalStateException e) {
-				assertEquals(ExecutionState2.CANCELED, vertex.getExecutionState());
+				assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
 			}
 		}
 		catch (Exception e) {
@@ -407,17 +465,18 @@ public class ExecutionVertexCancelTest {
 	
 	@Test
 	public void testActionsWhileCancelling() {
+		
 		try {
 			final JobVertexID jid = new JobVertexID();
 			final ExecutionJobVertex ejv = getJobVertexNotExecuting(jid);
 			
 			// scheduling while canceling is an illegal state transition
 			try {
-				ExecutionVertex2 vertex = new ExecutionVertex2(ejv, 0, new IntermediateResult[0]);
-				setVertexState(vertex, ExecutionState2.CANCELING);
+				ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0]);
+				setVertexState(vertex, ExecutionState.CANCELING);
 				
 				DefaultScheduler scheduler = mock(DefaultScheduler.class);
-				vertex.scheduleForExecution(scheduler);
+				vertex.scheduleForExecution(scheduler, false);
 				fail("Method should throw an exception");
 			}
 			catch (IllegalStateException e) {}
@@ -425,8 +484,8 @@ public class ExecutionVertexCancelTest {
 			
 			// deploying while in canceling state is illegal (should immediately go to canceled)
 			try {
-				ExecutionVertex2 vertex = new ExecutionVertex2(ejv, 0, new IntermediateResult[0]);
-				setVertexState(vertex, ExecutionState2.CANCELING);
+				ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0]);
+				setVertexState(vertex, ExecutionState.CANCELING);
 				
 				TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class);
 				Instance instance = getInstance(taskManager);
@@ -440,19 +499,19 @@ public class ExecutionVertexCancelTest {
 			
 			// fail while canceling
 			{
-				ExecutionVertex2 vertex = new ExecutionVertex2(ejv, 0, new IntermediateResult[0]);
+				ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0]);
 				
 				TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class);
 				Instance instance = getInstance(taskManager);
 				AllocatedSlot slot = instance.allocateSlot(new JobID());
 				
 				setVertexResource(vertex, slot);
-				setVertexState(vertex, ExecutionState2.CANCELING);
+				setVertexState(vertex, ExecutionState.CANCELING);
 				
 				Exception failureCause = new Exception("test exception");
 				
 				vertex.fail(failureCause);
-				assertEquals(ExecutionState2.FAILED, vertex.getExecutionState());
+				assertEquals(ExecutionState.FAILED, vertex.getExecutionState());
 				assertEquals(failureCause, vertex.getFailureCause());
 				
 				assertTrue(slot.isReleased());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index 5c61993..a1bb1cb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -19,24 +19,32 @@
 package org.apache.flink.runtime.executiongraph;
 
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.*;
+
 import static org.junit.Assert.*;
+
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.times;
+import static org.mockito.Matchers.any;
 
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
-import org.apache.flink.runtime.execution.ExecutionState2;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.protocols.TaskOperationProtocol;
 import org.apache.flink.runtime.taskmanager.TaskOperationResult;
+
 import org.junit.Test;
+
 import org.mockito.Matchers;
 
 public class ExecutionVertexDeploymentTest {
 
+	
 	@Test
 	public void testDeployCall() {
 		try {
@@ -50,11 +58,11 @@ public class ExecutionVertexDeploymentTest {
 			
 			final ExecutionJobVertex ejv = getJobVertexNotExecuting(jid);
 			
-			final ExecutionVertex2 vertex = new ExecutionVertex2(ejv, 0, new IntermediateResult[0]);
+			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0]);
 			
-			assertEquals(ExecutionState2.CREATED, vertex.getExecutionState());
+			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
 			vertex.deployToSlot(slot);
-			assertEquals(ExecutionState2.DEPLOYING, vertex.getExecutionState());
+			assertEquals(ExecutionState.DEPLOYING, vertex.getExecutionState());
 			
 			// no repeated scheduling
 			try {
@@ -62,6 +70,11 @@ public class ExecutionVertexDeploymentTest {
 				fail("Scheduled from wrong state");
 			}
 			catch (IllegalStateException e) {}
+			
+			assertNull(vertex.getFailureCause());
+			
+			assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
+			assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -75,19 +88,22 @@ public class ExecutionVertexDeploymentTest {
 			final JobVertexID jid = new JobVertexID();
 			
 			// mock taskmanager to simply accept the call
-			TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class);
-			when(taskManager.submitTask(Matchers.any(TaskDeploymentDescriptor.class))).thenReturn(new TaskOperationResult(jid, 0, true));
-			
+			final TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class);
 			final Instance instance = getInstance(taskManager);
 			final AllocatedSlot slot = instance.allocateSlot(new JobID());
 			
 			final ExecutionJobVertex ejv = getJobVertexExecutingSynchronously(jid);
 			
-			final ExecutionVertex2 vertex = new ExecutionVertex2(ejv, 0, new IntermediateResult[0]);
+			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0]);
+			
+			when(taskManager.submitTask(Matchers.any(TaskDeploymentDescriptor.class))).thenReturn(new TaskOperationResult(vertex.getCurrentExecutionAttempt().getAttemptId(), true));
+			
+			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
 			
-			assertEquals(ExecutionState2.CREATED, vertex.getExecutionState());
+			LibraryCacheManager.register(vertex.getJobId(), new String[0]);
 			vertex.deployToSlot(slot);
-			assertEquals(ExecutionState2.RUNNING, vertex.getExecutionState());
+			
+			assertEquals(ExecutionState.RUNNING, vertex.getExecutionState());
 			
 			// no repeated scheduling
 			try {
@@ -97,6 +113,12 @@ public class ExecutionVertexDeploymentTest {
 			catch (IllegalStateException e) {}
 			
 			verify(taskManager).submitTask(Matchers.any(TaskDeploymentDescriptor.class));
+			
+			assertNull(vertex.getFailureCause());
+			
+			assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
+			assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
+			assertTrue(vertex.getStateTimestamp(ExecutionState.RUNNING) > 0);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -111,16 +133,19 @@ public class ExecutionVertexDeploymentTest {
 			
 			// mock taskmanager to simply accept the call
 			TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class);
-			when(taskManager.submitTask(Matchers.any(TaskDeploymentDescriptor.class))).thenReturn(new TaskOperationResult(jid, 0, true));
+			
 			
 			final Instance instance = getInstance(taskManager);
 			final AllocatedSlot slot = instance.allocateSlot(new JobID());
 			
 			final ExecutionJobVertex ejv = getJobVertexExecutingAsynchronously(jid);
 			
-			final ExecutionVertex2 vertex = new ExecutionVertex2(ejv, 0, new IntermediateResult[0]);
+			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0]);
+			when(taskManager.submitTask(Matchers.any(TaskDeploymentDescriptor.class))).thenReturn(new TaskOperationResult(vertex.getCurrentExecutionAttempt().getAttemptId(), true));
+			
+			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
 			
-			assertEquals(ExecutionState2.CREATED, vertex.getExecutionState());
+			LibraryCacheManager.register(vertex.getJobId(), new String[0]);
 			vertex.deployToSlot(slot);
 			
 			// no repeated scheduling
@@ -132,12 +157,12 @@ public class ExecutionVertexDeploymentTest {
 			
 			// wait until the state transition must be done
 			for (int i = 0; i < 100; i++) {
-				if (vertex.getExecutionState() != ExecutionState2.RUNNING) {
+				if (vertex.getExecutionState() != ExecutionState.RUNNING) {
 					Thread.sleep(10);
 				}
 			}
 			
-			assertEquals(ExecutionState2.RUNNING, vertex.getExecutionState());
+			assertEquals(ExecutionState.RUNNING, vertex.getExecutionState());
 			
 			// no repeated scheduling
 			try {
@@ -147,6 +172,10 @@ public class ExecutionVertexDeploymentTest {
 			catch (IllegalStateException e) {}
 			
 			verify(taskManager).submitTask(Matchers.any(TaskDeploymentDescriptor.class));
+			
+			assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
+			assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
+			assertTrue(vertex.getStateTimestamp(ExecutionState.RUNNING) > 0);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -156,25 +185,34 @@ public class ExecutionVertexDeploymentTest {
 	
 	@Test
 	public void testDeployFailedSynchronous() {
+		final String ERROR_MESSAGE = "test_failure_error_message";
+		
 		try {
 			final JobVertexID jid = new JobVertexID();
 			
 			// mock taskmanager to simply accept the call
-			TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class);
-			when(taskManager.submitTask(Matchers.any(TaskDeploymentDescriptor.class))).thenReturn(new TaskOperationResult(jid, 0, false, "failed"));
-			
+			final TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class);
 			final Instance instance = getInstance(taskManager);
 			final AllocatedSlot slot = instance.allocateSlot(new JobID());
 			
 			final ExecutionJobVertex ejv = getJobVertexExecutingSynchronously(jid);
 			
-			final ExecutionVertex2 vertex = new ExecutionVertex2(ejv, 0, new IntermediateResult[0]);
+			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0]);
+			
+			when(taskManager.submitTask(Matchers.any(TaskDeploymentDescriptor.class))).thenReturn(new TaskOperationResult(vertex.getCurrentExecutionAttempt().getAttemptId(), false, ERROR_MESSAGE));
+			
+			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
 			
-			assertEquals(ExecutionState2.CREATED, vertex.getExecutionState());
+			LibraryCacheManager.register(vertex.getJobId(), new String[0]);
 			vertex.deployToSlot(slot);
 			
-			assertEquals(ExecutionState2.FAILED, vertex.getExecutionState());
+			assertEquals(ExecutionState.FAILED, vertex.getExecutionState());
 			assertNotNull(vertex.getFailureCause());
+			assertTrue(vertex.getFailureCause().getMessage().contains(ERROR_MESSAGE));
+			
+			assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
+			assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
+			assertTrue(vertex.getStateTimestamp(ExecutionState.FAILED) > 0);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -184,32 +222,141 @@ public class ExecutionVertexDeploymentTest {
 	
 	@Test
 	public void testDeployFailedAsynchronously() {
+		final String ERROR_MESSAGE = "test_failure_error_message";
+		
 		try {
 			final JobVertexID jid = new JobVertexID();
 			
 			// mock taskmanager to simply accept the call
-			TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class);
-			when(taskManager.submitTask(Matchers.any(TaskDeploymentDescriptor.class))).thenReturn(new TaskOperationResult(jid, 0, false, "failed"));
-			
+			final TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class);
 			final Instance instance = getInstance(taskManager);
 			final AllocatedSlot slot = instance.allocateSlot(new JobID());
 			
 			final ExecutionJobVertex ejv = getJobVertexExecutingAsynchronously(jid);
+			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0]);
 			
-			final ExecutionVertex2 vertex = new ExecutionVertex2(ejv, 0, new IntermediateResult[0]);
+			when(taskManager.submitTask(Matchers.any(TaskDeploymentDescriptor.class))).thenReturn(new TaskOperationResult(vertex.getCurrentExecutionAttempt().getAttemptId(), false, ERROR_MESSAGE));
 			
-			assertEquals(ExecutionState2.CREATED, vertex.getExecutionState());
+			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
+			
+			LibraryCacheManager.register(vertex.getJobId(), new String[0]);
 			vertex.deployToSlot(slot);
 			
 			// wait until the state transition must be done
 			for (int i = 0; i < 100; i++) {
-				if (vertex.getExecutionState() != ExecutionState2.FAILED) {
+				if (vertex.getExecutionState() != ExecutionState.FAILED) {
 					Thread.sleep(10);
 				}
 			}
 			
-			assertEquals(ExecutionState2.FAILED, vertex.getExecutionState());
+			assertEquals(ExecutionState.FAILED, vertex.getExecutionState());
 			assertNotNull(vertex.getFailureCause());
+			assertTrue(vertex.getFailureCause().getMessage().contains(ERROR_MESSAGE));
+			
+			assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
+			assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
+			assertTrue(vertex.getStateTimestamp(ExecutionState.FAILED) > 0);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testFailExternallyDuringDeploy() {
+		
+		try {
+			final JobVertexID jid = new JobVertexID();
+			
+			// mock taskmanager to simply accept the call
+			TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class);
+			
+			final Instance instance = getInstance(taskManager);
+			final AllocatedSlot slot = instance.allocateSlot(new JobID());
+			
+			final ExecutionJobVertex ejv = getJobVertexNotExecuting(jid);
+			
+			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0]);
+			
+			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
+			vertex.deployToSlot(slot);
+			assertEquals(ExecutionState.DEPLOYING, vertex.getExecutionState());
+			
+			Exception testError = new Exception("test error");
+			vertex.fail(testError);
+			
+			assertEquals(ExecutionState.FAILED, vertex.getExecutionState());
+			assertEquals(testError, vertex.getFailureCause());
+			
+			assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
+			assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
+			assertTrue(vertex.getStateTimestamp(ExecutionState.FAILED) > 0);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testFailCallOvertakesDeploymentAnswer() {
+		
+		try {
+			final ActionQueue queue = new ActionQueue();
+			final JobVertexID jid = new JobVertexID();
+			
+			// mock taskmanager to simply accept the call
+			TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class);
+			
+			final Instance instance = getInstance(taskManager);
+			final AllocatedSlot slot = instance.allocateSlot(new JobID());
+			
+			final ExecutionJobVertex ejv = getJobVertexExecutingTriggered(jid, queue);
+			
+			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0]);
+			final ExecutionAttemptID eid = vertex.getCurrentExecutionAttempt().getAttemptId();
+			
+			// the deployment call succeeds regularly
+			when(taskManager.submitTask(any(TaskDeploymentDescriptor.class))).thenReturn(new TaskOperationResult(eid, true));
+			
+			// first cancel call does not find a task, second one finds it
+			when(taskManager.cancelTask(any(ExecutionAttemptID.class))).thenReturn(
+					new TaskOperationResult(eid, false), new TaskOperationResult(eid, true));
+			
+			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
+			
+			LibraryCacheManager.register(vertex.getJobId(), new String[0]);
+			vertex.deployToSlot(slot);
+			assertEquals(ExecutionState.DEPLOYING, vertex.getExecutionState());
+			
+			Exception testError = new Exception("test error");
+			vertex.fail(testError);
+			
+			assertEquals(ExecutionState.FAILED, vertex.getExecutionState());
+			
+			// now the deploy call returns
+			Runnable deploy = queue.popNextAction();
+			Runnable cancel1 = queue.popNextAction();
+			
+			// cancel call overtakes
+			cancel1.run();
+			deploy.run();
+			
+			assertEquals(ExecutionState.FAILED, vertex.getExecutionState());
+			
+			// should have sent another cancel call
+			queue.triggerNextAction();
+			
+			assertEquals(testError, vertex.getFailureCause());
+			
+			assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
+			assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
+			assertTrue(vertex.getStateTimestamp(ExecutionState.FAILED) > 0);
+			
+			// should have received two cancel calls
+			verify(taskManager, times(2)).cancelTask(eid);
+			verify(taskManager, times(1)).submitTask(any(TaskDeploymentDescriptor.class));
 		}
 		catch (Exception e) {
 			e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
new file mode 100644
index 0000000..43d6547
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getJobVertexNotExecuting;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.instance.AllocatedSlot;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.DefaultScheduler;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
+import org.apache.flink.runtime.protocols.TaskOperationProtocol;
+
+import org.junit.Test;
+
+import org.mockito.Matchers;
+
+public class ExecutionVertexSchedulingTest {
+	
+	@Test
+	public void testSlotReleasedWhenScheduledImmediately() {
+		
+		try {
+			// a slot than cannot be deployed to
+			final TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class);
+			final Instance instance = getInstance(taskManager);
+			final AllocatedSlot slot = instance.allocateSlot(new JobID());
+			slot.cancel();
+			assertFalse(slot.isReleased());
+			
+			final ExecutionJobVertex ejv = getJobVertexNotExecuting(new JobVertexID());
+			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0]);
+			
+			DefaultScheduler scheduler = mock(DefaultScheduler.class);
+			when(scheduler.scheduleImmediately(Matchers.any(ScheduledUnit.class))).thenReturn(slot);
+			
+			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
+			// try to deploy to the slot
+			vertex.scheduleForExecution(scheduler, false);
+			
+			// will have failed
+			assertEquals(ExecutionState.FAILED, vertex.getExecutionState());
+			assertTrue(slot.isReleased());
+			
+			verify(taskManager, times(0)).submitTask(Matchers.any(TaskDeploymentDescriptor.class));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testSlotReleasedWhenScheduledQueued() {
+
+		try {
+			// a slot than cannot be deployed to
+			final TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class);
+			final Instance instance = getInstance(taskManager);
+			final AllocatedSlot slot = instance.allocateSlot(new JobID());
+			slot.cancel();
+			assertFalse(slot.isReleased());
+			
+			final SlotAllocationFuture future = new SlotAllocationFuture();
+			
+			final ExecutionJobVertex ejv = getJobVertexNotExecuting(new JobVertexID());
+			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0]);
+			
+			DefaultScheduler scheduler = mock(DefaultScheduler.class);
+			when(scheduler.scheduleQueued(Matchers.any(ScheduledUnit.class))).thenReturn(future);
+			
+			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
+			// try to deploy to the slot
+			vertex.scheduleForExecution(scheduler, true);
+			
+			// future has not yet a slot
+			assertEquals(ExecutionState.SCHEDULED, vertex.getExecutionState());
+			
+			future.setSlot(slot);
+			
+			// will have failed
+			assertEquals(ExecutionState.FAILED, vertex.getExecutionState());
+			assertTrue(slot.isReleased());
+			
+			verify(taskManager, times(0)).submitTask(Matchers.any(TaskDeploymentDescriptor.class));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testScheduleToDeploy() {
+		try {
+			// a slot than cannot be deployed to
+			final TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class);
+			final Instance instance = getInstance(taskManager);
+			final AllocatedSlot slot = instance.allocateSlot(new JobID());
+			
+			final ExecutionJobVertex ejv = getJobVertexNotExecuting(new JobVertexID());
+			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0]);
+			
+			DefaultScheduler scheduler = mock(DefaultScheduler.class);
+			when(scheduler.scheduleImmediately(Matchers.any(ScheduledUnit.class))).thenReturn(slot);
+			
+			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
+
+			// try to deploy to the slot
+			vertex.scheduleForExecution(scheduler, false);
+			assertEquals(ExecutionState.DEPLOYING, vertex.getExecutionState());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
index 0ea00d3..5a73253 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
@@ -66,10 +66,10 @@ public class PointwisePatternTest {
 		
 		ExecutionJobVertex target = eg.getAllVertices().get(v2.getID());
 		
-		for (ExecutionVertex2 ev : target.getTaskVertices()) {
+		for (ExecutionVertex ev : target.getTaskVertices()) {
 			assertEquals(1, ev.getNumberOfInputs());
 			
-			ExecutionEdge2[] inEdges = ev.getInputEdges(0);
+			ExecutionEdge[] inEdges = ev.getInputEdges(0);
 			assertEquals(1, inEdges.length);
 			
 			assertEquals(ev.getParallelSubtaskIndex(), inEdges[0].getSource().getPartition());
@@ -101,10 +101,10 @@ public class PointwisePatternTest {
 		
 		ExecutionJobVertex target = eg.getAllVertices().get(v2.getID());
 		
-		for (ExecutionVertex2 ev : target.getTaskVertices()) {
+		for (ExecutionVertex ev : target.getTaskVertices()) {
 			assertEquals(1, ev.getNumberOfInputs());
 			
-			ExecutionEdge2[] inEdges = ev.getInputEdges(0);
+			ExecutionEdge[] inEdges = ev.getInputEdges(0);
 			assertEquals(2, inEdges.length);
 			
 			assertEquals(ev.getParallelSubtaskIndex() * 2, inEdges[0].getSource().getPartition());
@@ -137,10 +137,10 @@ public class PointwisePatternTest {
 		
 		ExecutionJobVertex target = eg.getAllVertices().get(v2.getID());
 		
-		for (ExecutionVertex2 ev : target.getTaskVertices()) {
+		for (ExecutionVertex ev : target.getTaskVertices()) {
 			assertEquals(1, ev.getNumberOfInputs());
 			
-			ExecutionEdge2[] inEdges = ev.getInputEdges(0);
+			ExecutionEdge[] inEdges = ev.getInputEdges(0);
 			assertEquals(3, inEdges.length);
 			
 			assertEquals(ev.getParallelSubtaskIndex() * 3, inEdges[0].getSource().getPartition());
@@ -174,10 +174,10 @@ public class PointwisePatternTest {
 		
 		ExecutionJobVertex target = eg.getAllVertices().get(v2.getID());
 		
-		for (ExecutionVertex2 ev : target.getTaskVertices()) {
+		for (ExecutionVertex ev : target.getTaskVertices()) {
 			assertEquals(1, ev.getNumberOfInputs());
 			
-			ExecutionEdge2[] inEdges = ev.getInputEdges(0);
+			ExecutionEdge[] inEdges = ev.getInputEdges(0);
 			assertEquals(1, inEdges.length);
 			
 			assertEquals(ev.getParallelSubtaskIndex() / 2, inEdges[0].getSource().getPartition());
@@ -209,10 +209,10 @@ public class PointwisePatternTest {
 		
 		ExecutionJobVertex target = eg.getAllVertices().get(v2.getID());
 		
-		for (ExecutionVertex2 ev : target.getTaskVertices()) {
+		for (ExecutionVertex ev : target.getTaskVertices()) {
 			assertEquals(1, ev.getNumberOfInputs());
 			
-			ExecutionEdge2[] inEdges = ev.getInputEdges(0);
+			ExecutionEdge[] inEdges = ev.getInputEdges(0);
 			assertEquals(1, inEdges.length);
 			
 			assertEquals(ev.getParallelSubtaskIndex() / 7, inEdges[0].getSource().getPartition());
@@ -266,10 +266,10 @@ public class PointwisePatternTest {
 		
 		int[] timesUsed = new int[lowDop];
 		
-		for (ExecutionVertex2 ev : target.getTaskVertices()) {
+		for (ExecutionVertex ev : target.getTaskVertices()) {
 			assertEquals(1, ev.getNumberOfInputs());
 			
-			ExecutionEdge2[] inEdges = ev.getInputEdges(0);
+			ExecutionEdge[] inEdges = ev.getInputEdges(0);
 			assertEquals(1, inEdges.length);
 			
 			
@@ -312,13 +312,13 @@ public class PointwisePatternTest {
 		
 		int[] timesUsed = new int[highDop];
 		
-		for (ExecutionVertex2 ev : target.getTaskVertices()) {
+		for (ExecutionVertex ev : target.getTaskVertices()) {
 			assertEquals(1, ev.getNumberOfInputs());
 			
-			ExecutionEdge2[] inEdges = ev.getInputEdges(0);
+			ExecutionEdge[] inEdges = ev.getInputEdges(0);
 			assertTrue(inEdges.length >= factor && inEdges.length <= factor + delta);
 			
-			for (ExecutionEdge2 ee : inEdges) {
+			for (ExecutionEdge ee : inEdges) {
 				timesUsed[ee.getSource().getPartition()]++;
 			}
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/test/java/org/apache/flink/runtime/fs/LineReaderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/fs/LineReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/fs/LineReaderTest.java
index f4dd731..03ea8df 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/fs/LineReaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/fs/LineReaderTest.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.fs;
 
 import static org.junit.Assert.assertEquals;
@@ -29,7 +28,6 @@ import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.local.LocalFileSystem;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotTest.java
index c10a5ef..67c0805 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotTest.java
@@ -23,7 +23,7 @@ import static org.junit.Assert.*;
 
 import java.net.InetAddress;
 
-import org.apache.flink.runtime.executiongraph.ExecutionVertex2;
+import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.junit.Test;
 import org.mockito.Matchers;
@@ -69,8 +69,8 @@ public class AllocatedSlotTest {
 	@Test
 	public void testSetExecutionVertex() {
 		try {
-			ExecutionVertex2 ev = mock(ExecutionVertex2.class);
-			ExecutionVertex2 ev_2 = mock(ExecutionVertex2.class);
+			Execution ev = mock(Execution.class);
+			Execution ev_2 = mock(Execution.class);
 			
 			// assign to alive slot
 			{
@@ -111,7 +111,7 @@ public class AllocatedSlotTest {
 	@Test
 	public void testReleaseCancelsVertex() {
 		try {
-			ExecutionVertex2 ev = mock(ExecutionVertex2.class);
+			Execution ev = mock(Execution.class);
 			
 			AllocatedSlot slot = getSlot();
 			assertTrue(slot.setExecutedVertex(ev));

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/LocalInstanceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/LocalInstanceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/LocalInstanceManagerTest.java
index 2583771..9a7622e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/LocalInstanceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/LocalInstanceManagerTest.java
@@ -37,7 +37,6 @@ import org.apache.flink.runtime.taskmanager.TaskManager;
 
 import org.junit.After;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class LocalInstanceManagerTest {
@@ -152,7 +151,9 @@ public class LocalInstanceManagerTest {
 	private static final class MockRPC implements JobManagerProtocol {
 
 		@Override
-		public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {}
+		public boolean updateTaskExecutionState(TaskExecutionState taskExecutionState) {
+			return false;
+		}
 
 		@Override
 		public boolean sendHeartbeat(InstanceID taskManagerId) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
index 8c36200..5837b81 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.io.disk.iomanager;
 
 import java.io.EOFException;
@@ -27,9 +26,9 @@ import java.util.Random;
 
 import org.junit.Assert;
 
-import org.junit.BeforeClass;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -41,11 +40,8 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-
 /**
  * Integration test case for the I/O manager.
- *
- *
  */
 public class IOManagerITCase {
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
index b8aac10..c7e3463 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
@@ -53,7 +53,7 @@ public class JobManagerITCase {
 			
 			final JobGraph jobGraph = new JobGraph("Test Job", vertex);
 			
-			JobManager jm = startJobManager();
+			JobManager jm = startJobManager(3);
 			try {
 				
 				// we need to register the job at the library cache manager (with no libraries)
@@ -104,11 +104,12 @@ public class JobManagerITCase {
 	
 	// --------------------------------------------------------------------------------------------
 	
-	private static final JobManager startJobManager() throws Exception {
+	private static final JobManager startJobManager(int numSlots) throws Exception {
 		Configuration cfg = new Configuration();
 		cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
 		cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, getAvailablePort());
 		cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 10);
+		cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots);
 		
 		GlobalConfiguration.includeConfiguration(cfg);
 		

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
deleted file mode 100644
index 835af95..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager;
-
-public class JobManagerTest {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
index 10fca8a..b092312 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.jobmanager.scheduler;
 
 import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.areAllDistinct;
-import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getDummyVertex;
+import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getDummyTask;
 import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getTestVertex;
 import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getRandomInstance;
 
@@ -120,17 +120,17 @@ public class SchedulerIsolatedTasksTest {
 			assertEquals(5, scheduler.getNumberOfAvailableSlots());
 			
 			// schedule something into all slots
-			AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyVertex()));
-			AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyVertex()));
-			AllocatedSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyVertex()));
-			AllocatedSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyVertex()));
-			AllocatedSlot s5 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyVertex()));
+			AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask()));
+			AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask()));
+			AllocatedSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask()));
+			AllocatedSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask()));
+			AllocatedSlot s5 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask()));
 			
 			// the slots should all be different
 			assertTrue(areAllDistinct(s1, s2, s3, s4, s5));
 			
 			try {
-				scheduler.scheduleImmediately(new ScheduledUnit(getDummyVertex()));
+				scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask()));
 				fail("Scheduler accepted scheduling request without available resource.");
 			}
 			catch (NoResourceAvailableException e) {
@@ -143,8 +143,8 @@ public class SchedulerIsolatedTasksTest {
 			assertEquals(2, scheduler.getNumberOfAvailableSlots());
 			
 			// now we can schedule some more slots
-			AllocatedSlot s6 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyVertex()));
-			AllocatedSlot s7 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyVertex()));
+			AllocatedSlot s6 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask()));
+			AllocatedSlot s7 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask()));
 			
 			assertTrue(areAllDistinct(s1, s2, s3, s4, s5, s6, s7));
 			
@@ -242,7 +242,7 @@ public class SchedulerIsolatedTasksTest {
 			disposeThread.start();
 			
 			for (int i = 0; i < NUM_TASKS_TO_SCHEDULE; i++) {
-				SlotAllocationFuture future = scheduler.scheduleQueued(new ScheduledUnit(getDummyVertex()));
+				SlotAllocationFuture future = scheduler.scheduleQueued(new ScheduledUnit(getDummyTask()));
 				future.setFutureAction(action);
 				allAllocatedSlots.add(future);
 			}
@@ -281,11 +281,11 @@ public class SchedulerIsolatedTasksTest {
 			scheduler.newInstanceAvailable(i3);
 			
 			List<AllocatedSlot> slots = new ArrayList<AllocatedSlot>();
-			slots.add(scheduler.scheduleImmediately(new ScheduledUnit(getDummyVertex())));
-			slots.add(scheduler.scheduleImmediately(new ScheduledUnit(getDummyVertex())));
-			slots.add(scheduler.scheduleImmediately(new ScheduledUnit(getDummyVertex())));
-			slots.add(scheduler.scheduleImmediately(new ScheduledUnit(getDummyVertex())));
-			slots.add(scheduler.scheduleImmediately(new ScheduledUnit(getDummyVertex())));
+			slots.add(scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask())));
+			slots.add(scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask())));
+			slots.add(scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask())));
+			slots.add(scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask())));
+			slots.add(scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask())));
 			
 			i2.markDead();
 			
@@ -306,7 +306,7 @@ public class SchedulerIsolatedTasksTest {
 			
 			// cannot get another slot, since all instances are dead
 			try {
-				scheduler.scheduleImmediately(new ScheduledUnit(getDummyVertex()));
+				scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask()));
 				fail("Scheduler served a slot from a dead instance");
 			}
 			catch (NoResourceAvailableException e) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
index 78a397e..d2e7598 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
@@ -28,7 +28,8 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.flink.runtime.executiongraph.ExecutionVertex2;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
@@ -67,17 +68,32 @@ public class SchedulerTestUtils {
 	}
 	
 	
-	public static ExecutionVertex2 getDummyVertex() {
-		ExecutionVertex2 vertex = mock(ExecutionVertex2.class);
+	public static Execution getDummyTask() {
+		ExecutionVertex vertex = mock(ExecutionVertex.class);
+		when(vertex.getJobId()).thenReturn(new JobID());
+		when(vertex.toString()).thenReturn("TEST-VERTEX");
+		
+		Execution execution = mock(Execution.class);
+		when(execution.getVertex()).thenReturn(vertex);
+		
+		return execution;
+	}
+	
+	public static Execution getTestVertex(Iterable<Instance> preferredLocations) {
+		ExecutionVertex vertex = mock(ExecutionVertex.class);
 		
+		when(vertex.getPreferredLocations()).thenReturn(preferredLocations);
 		when(vertex.getJobId()).thenReturn(new JobID());
 		when(vertex.toString()).thenReturn("TEST-VERTEX");
 		
-		return vertex;
+		Execution execution = mock(Execution.class);
+		when(execution.getVertex()).thenReturn(vertex);
+		
+		return execution;
 	}
 	
-	public static ExecutionVertex2 getTestVertex(JobVertexID jid, int taskIndex, int numTasks) {
-		ExecutionVertex2 vertex = mock(ExecutionVertex2.class);
+	public static Execution getTestVertex(JobVertexID jid, int taskIndex, int numTasks) {
+		ExecutionVertex vertex = mock(ExecutionVertex.class);
 		
 		when(vertex.getPreferredLocations()).thenReturn(null);
 		when(vertex.getJobId()).thenReturn(new JobID());
@@ -86,11 +102,14 @@ public class SchedulerTestUtils {
 		when(vertex.getTotalNumberOfParallelSubtasks()).thenReturn(numTasks);
 		when(vertex.toString()).thenReturn("TEST-VERTEX");
 		
-		return vertex;
+		Execution execution = mock(Execution.class);
+		when(execution.getVertex()).thenReturn(vertex);
+		
+		return execution;
 	}
 	
-	public static ExecutionVertex2 getTestVertexWithLocation(JobVertexID jid, int taskIndex, int numTasks, Instance... locations) {
-		ExecutionVertex2 vertex = mock(ExecutionVertex2.class);
+	public static Execution getTestVertexWithLocation(JobVertexID jid, int taskIndex, int numTasks, Instance... locations) {
+		ExecutionVertex vertex = mock(ExecutionVertex.class);
 		
 		when(vertex.getPreferredLocations()).thenReturn(Arrays.asList(locations));
 		when(vertex.getJobId()).thenReturn(new JobID());
@@ -99,17 +118,10 @@ public class SchedulerTestUtils {
 		when(vertex.getTotalNumberOfParallelSubtasks()).thenReturn(numTasks);
 		when(vertex.toString()).thenReturn("TEST-VERTEX");
 		
-		return vertex;
-	}
-	
-	public static ExecutionVertex2 getTestVertex(Iterable<Instance> preferredLocations) {
-		ExecutionVertex2 vertex = mock(ExecutionVertex2.class);
-		
-		when(vertex.getPreferredLocations()).thenReturn(preferredLocations);
-		when(vertex.getJobId()).thenReturn(new JobID());
-		when(vertex.toString()).thenReturn("TEST-VERTEX");
+		Execution execution = mock(Execution.class);
+		when(execution.getVertex()).thenReturn(vertex);
 		
-		return vertex;
+		return execution;
 	}
 	
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
index 7ae3d40..7a47510 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators.hash;
 
 import static org.junit.Assert.assertEquals;
@@ -53,10 +52,10 @@ import org.apache.flink.types.Key;
 import org.apache.flink.types.NullKeyFieldException;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.MutableObjectIterator;
+
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class HashTableITCase {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
index 9a18742..666ecd5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
@@ -48,12 +48,11 @@ import org.apache.flink.types.IntValue;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
+
 import org.junit.After;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
-
 public class CombiningUnilateralSortMergerITCase {
 	
 	private static final Logger LOG = LoggerFactory.getLogger(CombiningUnilateralSortMergerITCase.class);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateTest.java
index 91dc5b3..6e5ab3e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateTest.java
@@ -20,7 +20,9 @@ package org.apache.flink.runtime.taskmanager;
 
 import static org.junit.Assert.*;
 
-import org.apache.flink.runtime.execution.ExecutionState2;
+import java.io.IOException;
+
+import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
@@ -33,11 +35,11 @@ public class TaskExecutionStateTest {
 		try {
 			final JobID jid = new JobID();
 			final ExecutionAttemptID executionId = new ExecutionAttemptID();
-			final ExecutionState2 state = ExecutionState2.RUNNING;
-			final String description = "some test description";
+			final ExecutionState state = ExecutionState.RUNNING;
+			final Throwable error = new RuntimeException("some test error message");
 			
-			TaskExecutionState s1 = new TaskExecutionState(jid, executionId, state, description);
-			TaskExecutionState s2 = new TaskExecutionState(jid, executionId, state, description);
+			TaskExecutionState s1 = new TaskExecutionState(jid, executionId, state, error);
+			TaskExecutionState s2 = new TaskExecutionState(jid, executionId, state, error);
 			
 			assertEquals(s1.hashCode(), s2.hashCode());
 			assertEquals(s1, s2);
@@ -53,16 +55,23 @@ public class TaskExecutionStateTest {
 		try {
 			final JobID jid = new JobID();
 			final ExecutionAttemptID executionId = new ExecutionAttemptID();
-			final ExecutionState2 state = ExecutionState2.DEPLOYING;
-			final String description = "foo bar";
+			final ExecutionState state = ExecutionState.DEPLOYING;
+			final Throwable error = new IOException("fubar");
+			
+			TaskExecutionState original1 = new TaskExecutionState(jid, executionId, state, error);
+			TaskExecutionState original2 = new TaskExecutionState(jid, executionId, state);
+			
+			TaskExecutionState writableCopy1 = CommonTestUtils.createCopyWritable(original1);
+			TaskExecutionState writableCopy2 = CommonTestUtils.createCopyWritable(original2);
 			
-			TaskExecutionState original = new TaskExecutionState(jid, executionId, state, description);
+			TaskExecutionState javaSerCopy1 = CommonTestUtils.createCopySerializable(original1);
+			TaskExecutionState javaSerCopy2 = CommonTestUtils.createCopySerializable(original2);
 			
-			TaskExecutionState writableCopy = CommonTestUtils.createCopyWritable(original);
-			TaskExecutionState javaSerCopy = CommonTestUtils.createCopySerializable(original);
+			assertEquals(original1, writableCopy1);
+			assertEquals(original1, javaSerCopy1);
 			
-			assertEquals(original, writableCopy);
-			assertEquals(original, javaSerCopy);
+			assertEquals(original2, writableCopy2);
+			assertEquals(original2, javaSerCopy2);
 		}
 		catch (Exception e) {
 			e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 3eb4a61..13261d7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -33,7 +33,7 @@ import org.apache.flink.runtime.ExecutionMode;
 import org.apache.flink.runtime.deployment.ChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.GateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
-import org.apache.flink.runtime.execution.ExecutionState2;
+import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.instance.HardwareDescription;
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.io.network.ConnectionInfoLookupResponse;
 import org.apache.flink.runtime.io.network.api.RecordReader;
 import org.apache.flink.runtime.io.network.api.RecordWriter;
+import org.apache.flink.runtime.io.network.bufferprovider.GlobalBufferPool;
 import org.apache.flink.runtime.io.network.channels.ChannelID;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -78,7 +79,6 @@ public class TaskManagerTest {
 			TaskOperationResult result = tm.submitTask(tdd);
 			assertTrue(result.isSuccess());
 			assertEquals(eid, result.getExecutionId());
-			assertEquals(vid, result.getVertexId());
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -126,8 +126,6 @@ public class TaskManagerTest {
 			assertTrue(result2.isSuccess());
 			assertEquals(eid1, result1.getExecutionId());
 			assertEquals(eid2, result2.getExecutionId());
-			assertEquals(vid1, result1.getVertexId());
-			assertEquals(vid2, result2.getVertexId());
 			
 			Map<ExecutionAttemptID, Task> tasks = tm.getAllRunningTasks();
 			assertEquals(2, tasks.size());
@@ -137,24 +135,24 @@ public class TaskManagerTest {
 			assertNotNull(t1);
 			assertNotNull(t2);
 			
-			assertEquals(ExecutionState2.RUNNING, t1.getExecutionState());
-			assertEquals(ExecutionState2.RUNNING, t2.getExecutionState());
+			assertEquals(ExecutionState.RUNNING, t1.getExecutionState());
+			assertEquals(ExecutionState.RUNNING, t2.getExecutionState());
 			
 			// cancel one task
-			assertTrue(tm.cancelTask(vid1, 1, eid1).isSuccess());
+			assertTrue(tm.cancelTask(eid1).isSuccess());
 			t1.getEnvironment().getExecutingThread().join();
-			assertEquals(ExecutionState2.CANCELED, t1.getExecutionState());
+			assertEquals(ExecutionState.CANCELED, t1.getExecutionState());
 			
 			tasks = tm.getAllRunningTasks();
 			assertEquals(1, tasks.size());
 			
 			// try to cancel a non existing task
-			assertFalse(tm.cancelTask(vid1, 1, eid1).isSuccess());
+			assertFalse(tm.cancelTask(eid1).isSuccess());
 			
 			// cancel the second task
-			assertTrue(tm.cancelTask(vid2, 2, eid2).isSuccess());
+			assertTrue(tm.cancelTask(eid2).isSuccess());
 			t2.getEnvironment().getExecutingThread().join();
-			assertEquals(ExecutionState2.CANCELED, t2.getExecutionState());
+			assertEquals(ExecutionState.CANCELED, t2.getExecutionState());
 			
 			tasks = tm.getAllRunningTasks();
 			assertEquals(0, tasks.size());
@@ -162,6 +160,8 @@ public class TaskManagerTest {
 			// the class loaders should be de-registered
 			assertNull(LibraryCacheManager.getClassLoader(jid1));
 			assertNull(LibraryCacheManager.getClassLoader(jid2));
+			
+			assertNetworkResourcesReleased(tm);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -205,6 +205,7 @@ public class TaskManagerTest {
 			
 			Map<ExecutionAttemptID, Task> tasks = tm.getAllRunningTasks();
 			assertEquals(0, tasks.size());
+			assertNetworkResourcesReleased(tm);
 			
 			// the class loaders should be de-registered
 			assertNull(LibraryCacheManager.getClassLoader(jid));
@@ -262,8 +263,6 @@ public class TaskManagerTest {
 			assertTrue(result2.isSuccess());
 			assertEquals(eid1, result1.getExecutionId());
 			assertEquals(eid2, result2.getExecutionId());
-			assertEquals(vid1, result1.getVertexId());
-			assertEquals(vid2, result2.getVertexId());
 			
 			Map<ExecutionAttemptID, Task> tasks = tm.getAllRunningTasks();
 			
@@ -278,14 +277,17 @@ public class TaskManagerTest {
 				t2.getEnvironment().getExecutingThread().join();
 			}
 			
-			assertEquals(ExecutionState2.FINISHED, t1.getExecutionState());
-			assertEquals(ExecutionState2.FINISHED, t2.getExecutionState());
+			assertEquals(ExecutionState.FINISHED, t1.getExecutionState());
+			assertEquals(ExecutionState.FINISHED, t2.getExecutionState());
 			
 			tasks = tm.getAllRunningTasks();
 			assertEquals(0, tasks.size());
 			
 			// the class loaders should be de-registered
 			assertNull(LibraryCacheManager.getClassLoader(jid));
+			
+			// make sure that the global buffer pool has all buffers back
+			assertNetworkResourcesReleased(tm);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -293,9 +295,97 @@ public class TaskManagerTest {
 		}
 	}
 	
+	@Test
+	public void testCancellingDependentAndStateUpdateFails() {
+		
+		// this tests creates two tasks. the sender sends data, and fails to send the
+		// state update back to the job manager
+		// the second one blocks to be canceled
+		
+		try {
+			JobID jid = new JobID();
+			
+			JobVertexID vid1 = new JobVertexID();
+			JobVertexID vid2 = new JobVertexID();
+			
+			ExecutionAttemptID eid1 = new ExecutionAttemptID();
+			ExecutionAttemptID eid2 = new ExecutionAttemptID();
+			
+			ChannelID senderId = new ChannelID();
+			ChannelID receiverId = new ChannelID();
+			
+			JobManager jobManager = getJobManagerMockBase();
+			when(jobManager.updateTaskExecutionState(any(TaskExecutionState.class))).thenReturn(false);
+			when(jobManager.lookupConnectionInfo(Matchers.any(InstanceConnectionInfo.class), Matchers.eq(jid), Matchers.eq(senderId)))
+				.thenReturn(ConnectionInfoLookupResponse.createReceiverFoundAndReady(receiverId));
+			
+			TaskManager tm = createTaskManager(jobManager);
+			
+			ChannelDeploymentDescriptor cdd = new ChannelDeploymentDescriptor(senderId, receiverId);
+			
+			TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid, vid1, eid1, "Sender", 0, 1,
+					new Configuration(), new Configuration(), Sender.class.getName(),
+					Collections.singletonList(new GateDeploymentDescriptor(Collections.singletonList(cdd))), 
+					Collections.<GateDeploymentDescriptor>emptyList(),
+					new String[0], 0);
+			
+			TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid, vid2, eid2, "Receiver", 2, 7,
+					new Configuration(), new Configuration(), ReceiverBlocking.class.getName(),
+					Collections.<GateDeploymentDescriptor>emptyList(),
+					Collections.singletonList(new GateDeploymentDescriptor(Collections.singletonList(cdd))),
+					new String[0], 0);
+			
+			// register the job twice (for two tasks) at the lib cache
+			LibraryCacheManager.register(jid, new String[0]);
+			LibraryCacheManager.register(jid, new String[0]);
+			assertNotNull(LibraryCacheManager.getClassLoader(jid));
+			
+			// deploy sender before receiver, so the target is online when the sender requests the connection info
+			TaskOperationResult result2 = tm.submitTask(tdd2);
+			TaskOperationResult result1 = tm.submitTask(tdd1);
+			
+			assertTrue(result1.isSuccess());
+			assertTrue(result2.isSuccess());
+			
+			Map<ExecutionAttemptID, Task> tasks = tm.getAllRunningTasks();
+			
+			Task t1 = tasks.get(eid1);
+			Task t2 = tasks.get(eid2);
+			
+			// cancel task 2. task one should either fail, or be done
+			tm.cancelTask(eid2);
+			
+			// wait until the task second task is canceled
+			if (t2 != null) {
+				t2.getEnvironment().getExecutingThread().join();
+			}
+			
+			if (t1 != null) {
+				if (t1.getExecutionState() == ExecutionState.RUNNING) {
+					tm.cancelTask(eid1);
+				}
+				t1.getEnvironment().getExecutingThread().join();
+			}
+			
+			// the task that failed to send the finished state 
+			assertEquals(0, tasks.size());
+			assertNetworkResourcesReleased(tm);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	
+	private static void assertNetworkResourcesReleased(TaskManager tm) {
+		GlobalBufferPool gbp = tm.getChannelManager().getGlobalBufferPool();
+		assertEquals(gbp.numBuffers(), gbp.numAvailableBuffers());
+	}
+	
 	// --------------------------------------------------------------------------------------------
 	
-	public static JobManager getJobManagerMockBase() {
+	public static JobManager getJobManagerMockBase() throws Exception {
 		JobManager jm = mock(JobManager.class);
 		
 		final InstanceID iid = new InstanceID();
@@ -305,6 +395,8 @@ public class TaskManagerTest {
 		
 		when(jm.sendHeartbeat(iid)).thenReturn(true);
 		
+		when(jm.updateTaskExecutionState(any(TaskExecutionState.class))).thenReturn(true);
+		
 		return jm;
 	}
 	
@@ -382,4 +474,19 @@ public class TaskManagerTest {
 			}
 		}
 	}
+	
+	public static final class ReceiverBlocking extends AbstractInvokable {
+
+		@Override
+		public void registerInputOutput() {
+			new RecordReader<IntegerRecord>(this, IntegerRecord.class);
+		}
+
+		@Override
+		public void invoke() throws Exception {
+			synchronized(this) {
+				wait();
+			}
+		}
+	}
 }


Mime
View raw message