flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [04/63] [abbrv] Refactor job graph construction to incremental attachment based
Date Sun, 21 Sep 2014 02:12:28 GMT
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
new file mode 100644
index 0000000..299f7ba
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -0,0 +1,466 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.*;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.times;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState2;
+import org.apache.flink.runtime.instance.AllocatedSlot;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.DefaultScheduler;
+import org.apache.flink.runtime.protocols.TaskOperationProtocol;
+import org.apache.flink.runtime.taskmanager.TaskOperationResult;
+import org.apache.flink.util.LogUtils;
+import org.apache.log4j.Level;
+import org.junit.Test;
+import org.mockito.Matchers;
+
+public class ExecutionVertexCancelTest {
+
+	// --------------------------------------------------------------------------------------------
+	//  Canceling in different states
+	// --------------------------------------------------------------------------------------------
+	
+	@Test
+	public void testCancelFromCreated() {
+		try {
+			final JobVertexID jid = new JobVertexID();
+			final ExecutionJobVertex ejv = getJobVertexNotExecuting(jid);
+			
+			final ExecutionVertex2 vertex = new ExecutionVertex2(ejv, 0, new IntermediateResult[0]);
+			
+			assertEquals(ExecutionState2.CREATED, vertex.getExecutionState());
+			
+			vertex.cancel();
+			
+			assertEquals(ExecutionState2.CANCELED, vertex.getExecutionState());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testCancelFromScheduled() {
+		try {
+			final JobVertexID jid = new JobVertexID();
+			final ExecutionJobVertex ejv = getJobVertexNotExecuting(jid);
+			
+			final ExecutionVertex2 vertex = new ExecutionVertex2(ejv, 0, new IntermediateResult[0]);
+			
+			setVertexState(vertex, ExecutionState2.SCHEDULED);
+			assertEquals(ExecutionState2.SCHEDULED, vertex.getExecutionState());
+			
+			vertex.cancel();
+			
+			assertEquals(ExecutionState2.CANCELED, vertex.getExecutionState());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testCancelConcurrentlyToDeploying_CallsNotOvertaking() {
+		try {
+			final JobVertexID jid = new JobVertexID();
+			final ActionQueue actions = new ActionQueue();
+			
+			final ExecutionJobVertex ejv = getJobVertexExecutingTriggered(jid, actions);
+			
+			final ExecutionVertex2 vertex = new ExecutionVertex2(ejv, 0, new IntermediateResult[0]);
+			
+			setVertexState(vertex, ExecutionState2.SCHEDULED);
+			assertEquals(ExecutionState2.SCHEDULED, vertex.getExecutionState());
+			
+			// task manager mock
+			TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class);
+			when(taskManager.submitTask(Matchers.any(TaskDeploymentDescriptor.class))).thenReturn(new TaskOperationResult(jid, 0, true));
+			when(taskManager.cancelTask(jid, 0)).thenReturn(new TaskOperationResult(jid, 0, true), new TaskOperationResult(jid, 0, false));
+			
+			Instance instance = getInstance(taskManager);
+			AllocatedSlot slot = instance.allocateSlot(new JobID());
+			
+			vertex.deployToSlot(slot);
+			
+			assertEquals(ExecutionState2.DEPLOYING, vertex.getExecutionState());
+			
+			vertex.cancel();
+			 
+			assertEquals(ExecutionState2.CANCELING, vertex.getExecutionState());
+			
+			verify(taskManager, times(0)).submitTask(Matchers.any(TaskDeploymentDescriptor.class));
+			verify(taskManager, times(0)).cancelTask(jid, 0);
+
+			// first action happens (deploy)
+			actions.triggerNextAction();
+			verify(taskManager, times(1)).submitTask(Matchers.any(TaskDeploymentDescriptor.class));
+			
+			// the deploy call found itself in canceling after it returned and needs to send a cancel call
+			// the call did not yet execute, so it is still in canceling
+			assertEquals(ExecutionState2.CANCELING, vertex.getExecutionState());
+			
+			// second action happens (cancel call from cancel function)
+			actions.triggerNextAction();
+			
+			// should properly set state to cancelled
+			assertEquals(ExecutionState2.CANCELED, vertex.getExecutionState());
+			
+			// trigger the correction canceling call
+			actions.triggerNextAction();
+			assertEquals(ExecutionState2.CANCELED, vertex.getExecutionState());
+			
+			verify(taskManager, times(2)).cancelTask(jid, 0);
+			
+			assertTrue(slot.isReleased());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testCancelConcurrentlyToDeploying_CallsOvertaking() {
+		try {
+			final JobVertexID jid = new JobVertexID();
+			final ActionQueue actions = new ActionQueue();
+			
+			final ExecutionJobVertex ejv = getJobVertexExecutingTriggered(jid, actions);
+			
+			final ExecutionVertex2 vertex = new ExecutionVertex2(ejv, 0, new IntermediateResult[0]);
+			
+			setVertexState(vertex, ExecutionState2.SCHEDULED);
+			assertEquals(ExecutionState2.SCHEDULED, vertex.getExecutionState());
+			
+			// task manager mock
+			TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class);
+			when(taskManager.submitTask(Matchers.any(TaskDeploymentDescriptor.class))).thenReturn(new TaskOperationResult(jid, 0, true));
+			
+			// first return NOT SUCCESS (task not found, cancel call overtook deploy call), then success (cancel call after deploy call)
+			when(taskManager.cancelTask(jid, 0)).thenReturn(new TaskOperationResult(jid, 0, false), new TaskOperationResult(jid, 0, true));
+			
+			Instance instance = getInstance(taskManager);
+			AllocatedSlot slot = instance.allocateSlot(new JobID());
+			
+			vertex.deployToSlot(slot);
+			
+			assertEquals(ExecutionState2.DEPLOYING, vertex.getExecutionState());
+			
+			vertex.cancel();
+			 
+			assertEquals(ExecutionState2.CANCELING, vertex.getExecutionState());
+			
+			verify(taskManager, times(0)).submitTask(Matchers.any(TaskDeploymentDescriptor.class));
+			verify(taskManager, times(0)).cancelTask(jid, 0);
+
+			// first action happens (deploy)
+			Runnable deployAction = actions.popNextAction();
+			Runnable cancelAction = actions.popNextAction();
+			
+			// cancel call first
+			cancelAction.run();
+			
+			// did not find the task, not properly cancelled, stay in canceling
+			assertEquals(ExecutionState2.CANCELING, vertex.getExecutionState());
+			
+			// deploy action next
+			deployAction.run();
+			
+			verify(taskManager, times(1)).submitTask(Matchers.any(TaskDeploymentDescriptor.class));
+			
+			// the deploy call found itself in canceling after it returned and needs to send a cancel call
+			// the call did not yet execute, so it is still in canceling
+			assertEquals(ExecutionState2.CANCELING, vertex.getExecutionState());
+			
+			// trigger the correcting cancel call, should properly set state to cancelled
+			actions.triggerNextAction();
+			assertEquals(ExecutionState2.CANCELED, vertex.getExecutionState());
+			
+			verify(taskManager, times(2)).cancelTask(jid, 0);
+			
+			assertTrue(slot.isReleased());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testCancelFromRunning() {
+		try {
+			final JobVertexID jid = new JobVertexID();
+			final ExecutionJobVertex ejv = getJobVertexExecutingSynchronously(jid);
+			
+			final ExecutionVertex2 vertex = new ExecutionVertex2(ejv, 0, new IntermediateResult[0]);
+
+			final TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class);
+			when(taskManager.cancelTask(jid, 0)).thenReturn(new TaskOperationResult(jid, 0, true));
+			
+			Instance instance = getInstance(taskManager);
+			AllocatedSlot slot = instance.allocateSlot(new JobID());
+
+			setVertexState(vertex, ExecutionState2.RUNNING);
+			setVertexResource(vertex, slot);
+			
+			assertEquals(ExecutionState2.RUNNING, vertex.getExecutionState());
+			
+			vertex.cancel();
+			
+			assertEquals(ExecutionState2.CANCELED, vertex.getExecutionState());
+			
+			verify(taskManager).cancelTask(jid, 0);
+			
+			assertTrue(slot.isReleased());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testRepeatedCancelFromRunning() {
+		try {
+			final JobVertexID jid = new JobVertexID();
+			final ActionQueue actions = new ActionQueue();
+			final ExecutionJobVertex ejv = getJobVertexExecutingTriggered(jid, actions);
+			
+			final ExecutionVertex2 vertex = new ExecutionVertex2(ejv, 0, new IntermediateResult[0]);
+
+			final TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class);
+			when(taskManager.cancelTask(jid, 0)).thenReturn(new TaskOperationResult(jid, 0, true));
+			
+			Instance instance = getInstance(taskManager);
+			AllocatedSlot slot = instance.allocateSlot(new JobID());
+
+			setVertexState(vertex, ExecutionState2.RUNNING);
+			setVertexResource(vertex, slot);
+			
+			assertEquals(ExecutionState2.RUNNING, vertex.getExecutionState());
+			
+			vertex.cancel();
+			
+			assertEquals(ExecutionState2.CANCELING, vertex.getExecutionState());
+			
+			vertex.cancel();
+			
+			assertEquals(ExecutionState2.CANCELING, vertex.getExecutionState());
+			
+			actions.triggerNextAction();
+			
+			assertEquals(ExecutionState2.CANCELED, vertex.getExecutionState());
+			
+			try {
+				actions.triggerNextAction();
+				fail("Too many calls sent.");
+			} catch (NoSuchElementException e) {}
+			
+			assertTrue(slot.isReleased());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testCancelFromRunningDidNotFindTask() {
+		// this may happen when the task finished or failed while the call was in progress
+		
+		try {
+			final JobVertexID jid = new JobVertexID();
+			final ExecutionJobVertex ejv = getJobVertexExecutingSynchronously(jid);
+			
+			final ExecutionVertex2 vertex = new ExecutionVertex2(ejv, 0, new IntermediateResult[0]);
+
+			final TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class);
+			when(taskManager.cancelTask(jid, 0)).thenReturn(new TaskOperationResult(jid, 0, false));
+			
+			Instance instance = getInstance(taskManager);
+			AllocatedSlot slot = instance.allocateSlot(new JobID());
+
+			setVertexState(vertex, ExecutionState2.RUNNING);
+			setVertexResource(vertex, slot);
+			
+			assertEquals(ExecutionState2.RUNNING, vertex.getExecutionState());
+			
+			vertex.cancel();
+			
+			assertEquals(ExecutionState2.CANCELING, vertex.getExecutionState());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testCancelCallFails() {
+		// this may happen when the task finished or failed while the call was in progress
+		LogUtils.initializeDefaultConsoleLogger(Level.OFF);
+		
+		try {
+			final JobVertexID jid = new JobVertexID();
+			final ExecutionJobVertex ejv = getJobVertexExecutingSynchronously(jid);
+			
+			final ExecutionVertex2 vertex = new ExecutionVertex2(ejv, 0, new IntermediateResult[0]);
+
+			final TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class);
+			when(taskManager.cancelTask(jid, 0)).thenThrow(new IOException("RPC call failed"));
+			
+			Instance instance = getInstance(taskManager);
+			AllocatedSlot slot = instance.allocateSlot(new JobID());
+
+			setVertexState(vertex, ExecutionState2.RUNNING);
+			setVertexResource(vertex, slot);
+			
+			assertEquals(ExecutionState2.RUNNING, vertex.getExecutionState());
+			
+			vertex.cancel();
+			
+			assertEquals(ExecutionState2.FAILED, vertex.getExecutionState());
+			
+			assertTrue(slot.isReleased());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  Actions after a vertex has been canceled or while canceling
+	// --------------------------------------------------------------------------------------------
+	
+	@Test
+	public void testScheduleOrDeployAfterCancel() {
+		try {
+			final JobVertexID jid = new JobVertexID();
+			final ExecutionJobVertex ejv = getJobVertexNotExecuting(jid);
+			
+			final ExecutionVertex2 vertex = new ExecutionVertex2(ejv, 0, new IntermediateResult[0]);
+			setVertexState(vertex, ExecutionState2.CANCELED);
+			
+			assertEquals(ExecutionState2.CANCELED, vertex.getExecutionState());
+			
+			// 1)
+			// scheduling after being created should be tolerated (no exception) because
+			// it can occur as the result of races
+			{
+				DefaultScheduler scheduler = mock(DefaultScheduler.class);
+				vertex.scheduleForExecution(scheduler);
+				
+				assertEquals(ExecutionState2.CANCELED, vertex.getExecutionState());
+			}
+			
+			// 2)
+			// deploying after canceling from CREATED needs to raise an exception, because
+			// the scheduler (or any caller) needs to know that the slot should be released
+			try {
+				TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class);
+				Instance instance = getInstance(taskManager);
+				AllocatedSlot slot = instance.allocateSlot(new JobID());
+				
+				vertex.deployToSlot(slot);
+				fail("Method should throw an exception");
+			}
+			catch (IllegalStateException e) {
+				assertEquals(ExecutionState2.CANCELED, vertex.getExecutionState());
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testActionsWhileCancelling() {
+		try {
+			final JobVertexID jid = new JobVertexID();
+			final ExecutionJobVertex ejv = getJobVertexNotExecuting(jid);
+			
+			// scheduling while canceling is an illegal state transition
+			try {
+				ExecutionVertex2 vertex = new ExecutionVertex2(ejv, 0, new IntermediateResult[0]);
+				setVertexState(vertex, ExecutionState2.CANCELING);
+				
+				DefaultScheduler scheduler = mock(DefaultScheduler.class);
+				vertex.scheduleForExecution(scheduler);
+				fail("Method should throw an exception");
+			}
+			catch (IllegalStateException e) {}
+			
+			
+			// deploying while in canceling state is illegal (should immediately go to canceled)
+			try {
+				ExecutionVertex2 vertex = new ExecutionVertex2(ejv, 0, new IntermediateResult[0]);
+				setVertexState(vertex, ExecutionState2.CANCELING);
+				
+				TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class);
+				Instance instance = getInstance(taskManager);
+				AllocatedSlot slot = instance.allocateSlot(new JobID());
+				
+				vertex.deployToSlot(slot);
+				fail("Method should throw an exception");
+			}
+			catch (IllegalStateException e) {}
+			
+			
+			// fail while canceling
+			{
+				ExecutionVertex2 vertex = new ExecutionVertex2(ejv, 0, new IntermediateResult[0]);
+				
+				TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class);
+				Instance instance = getInstance(taskManager);
+				AllocatedSlot slot = instance.allocateSlot(new JobID());
+				
+				setVertexResource(vertex, slot);
+				setVertexState(vertex, ExecutionState2.CANCELING);
+				
+				Exception failureCause = new Exception("test exception");
+				
+				vertex.fail(failureCause);
+				assertEquals(ExecutionState2.FAILED, vertex.getExecutionState());
+				assertEquals(failureCause, vertex.getFailureCause());
+				
+				assertTrue(slot.isReleased());
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
new file mode 100644
index 0000000..5c61993
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.*;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState2;
+import org.apache.flink.runtime.instance.AllocatedSlot;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.protocols.TaskOperationProtocol;
+import org.apache.flink.runtime.taskmanager.TaskOperationResult;
+import org.junit.Test;
+import org.mockito.Matchers;
+
+public class ExecutionVertexDeploymentTest {
+
+	@Test
+	public void testDeployCall() {
+		try {
+			final JobVertexID jid = new JobVertexID();
+			
+			// mock taskmanager to simply accept the call
+			TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class);
+			
+			final Instance instance = getInstance(taskManager);
+			final AllocatedSlot slot = instance.allocateSlot(new JobID());
+			
+			final ExecutionJobVertex ejv = getJobVertexNotExecuting(jid);
+			
+			final ExecutionVertex2 vertex = new ExecutionVertex2(ejv, 0, new IntermediateResult[0]);
+			
+			assertEquals(ExecutionState2.CREATED, vertex.getExecutionState());
+			vertex.deployToSlot(slot);
+			assertEquals(ExecutionState2.DEPLOYING, vertex.getExecutionState());
+			
+			// no repeated scheduling
+			try {
+				vertex.deployToSlot(slot);
+				fail("Scheduled from wrong state");
+			}
+			catch (IllegalStateException e) {}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testDeployWithSynchronousAnswer() {
+		try {
+			final JobVertexID jid = new JobVertexID();
+			
+			// mock taskmanager to simply accept the call
+			TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class);
+			when(taskManager.submitTask(Matchers.any(TaskDeploymentDescriptor.class))).thenReturn(new TaskOperationResult(jid, 0, true));
+			
+			final Instance instance = getInstance(taskManager);
+			final AllocatedSlot slot = instance.allocateSlot(new JobID());
+			
+			final ExecutionJobVertex ejv = getJobVertexExecutingSynchronously(jid);
+			
+			final ExecutionVertex2 vertex = new ExecutionVertex2(ejv, 0, new IntermediateResult[0]);
+			
+			assertEquals(ExecutionState2.CREATED, vertex.getExecutionState());
+			vertex.deployToSlot(slot);
+			assertEquals(ExecutionState2.RUNNING, vertex.getExecutionState());
+			
+			// no repeated scheduling
+			try {
+				vertex.deployToSlot(slot);
+				fail("Scheduled from wrong state");
+			}
+			catch (IllegalStateException e) {}
+			
+			verify(taskManager).submitTask(Matchers.any(TaskDeploymentDescriptor.class));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testDeployWithAsynchronousAnswer() {
+		try {
+			final JobVertexID jid = new JobVertexID();
+			
+			// mock taskmanager to simply accept the call
+			TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class);
+			when(taskManager.submitTask(Matchers.any(TaskDeploymentDescriptor.class))).thenReturn(new TaskOperationResult(jid, 0, true));
+			
+			final Instance instance = getInstance(taskManager);
+			final AllocatedSlot slot = instance.allocateSlot(new JobID());
+			
+			final ExecutionJobVertex ejv = getJobVertexExecutingAsynchronously(jid);
+			
+			final ExecutionVertex2 vertex = new ExecutionVertex2(ejv, 0, new IntermediateResult[0]);
+			
+			assertEquals(ExecutionState2.CREATED, vertex.getExecutionState());
+			vertex.deployToSlot(slot);
+			
+			// no repeated scheduling
+			try {
+				vertex.deployToSlot(slot);
+				fail("Scheduled from wrong state");
+			}
+			catch (IllegalStateException e) {}
+			
+			// wait until the state transition must be done
+			for (int i = 0; i < 100; i++) {
+				if (vertex.getExecutionState() != ExecutionState2.RUNNING) {
+					Thread.sleep(10);
+				}
+			}
+			
+			assertEquals(ExecutionState2.RUNNING, vertex.getExecutionState());
+			
+			// no repeated scheduling
+			try {
+				vertex.deployToSlot(slot);
+				fail("Scheduled from wrong state");
+			}
+			catch (IllegalStateException e) {}
+			
+			verify(taskManager).submitTask(Matchers.any(TaskDeploymentDescriptor.class));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testDeployFailedSynchronous() {
+		try {
+			final JobVertexID jid = new JobVertexID();
+			
+			// mock taskmanager to simply accept the call
+			TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class);
+			when(taskManager.submitTask(Matchers.any(TaskDeploymentDescriptor.class))).thenReturn(new TaskOperationResult(jid, 0, false, "failed"));
+			
+			final Instance instance = getInstance(taskManager);
+			final AllocatedSlot slot = instance.allocateSlot(new JobID());
+			
+			final ExecutionJobVertex ejv = getJobVertexExecutingSynchronously(jid);
+			
+			final ExecutionVertex2 vertex = new ExecutionVertex2(ejv, 0, new IntermediateResult[0]);
+			
+			assertEquals(ExecutionState2.CREATED, vertex.getExecutionState());
+			vertex.deployToSlot(slot);
+			
+			assertEquals(ExecutionState2.FAILED, vertex.getExecutionState());
+			assertNotNull(vertex.getFailureCause());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testDeployFailedAsynchronously() {
+		try {
+			final JobVertexID jid = new JobVertexID();
+			
+			// mock taskmanager to simply accept the call
+			TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class);
+			when(taskManager.submitTask(Matchers.any(TaskDeploymentDescriptor.class))).thenReturn(new TaskOperationResult(jid, 0, false, "failed"));
+			
+			final Instance instance = getInstance(taskManager);
+			final AllocatedSlot slot = instance.allocateSlot(new JobID());
+			
+			final ExecutionJobVertex ejv = getJobVertexExecutingAsynchronously(jid);
+			
+			final ExecutionVertex2 vertex = new ExecutionVertex2(ejv, 0, new IntermediateResult[0]);
+			
+			assertEquals(ExecutionState2.CREATED, vertex.getExecutionState());
+			vertex.deployToSlot(slot);
+			
+			// wait until the state transition must be done
+			for (int i = 0; i < 100; i++) {
+				if (vertex.getExecutionState() != ExecutionState2.FAILED) {
+					Thread.sleep(10);
+				}
+			}
+			
+			assertEquals(ExecutionState2.FAILED, vertex.getExecutionState());
+			assertNotNull(vertex.getFailureCause());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ForwardTask1Input1Output.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ForwardTask1Input1Output.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ForwardTask1Input1Output.java
deleted file mode 100644
index 3e500fe..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ForwardTask1Input1Output.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.executiongraph;
-
-import org.apache.flink.core.io.StringRecord;
-import org.apache.flink.runtime.io.network.api.RecordReader;
-import org.apache.flink.runtime.io.network.api.RecordWriter;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-
-public class ForwardTask1Input1Output extends AbstractInvokable {
-
-	private RecordReader<StringRecord> input = null;
-
-	private RecordWriter<StringRecord> output = null;
-
-	@Override
-	public void invoke() throws Exception {
-		this.output.initializeSerializers();
-
-		while (this.input.hasNext()) {
-
-			StringRecord s = input.next();
-			this.output.emit(s);
-		}
-
-		this.output.flush();
-	}
-
-	@Override
-	public void registerInputOutput() {
-		this.input = new RecordReader<StringRecord>(this, StringRecord.class);
-		this.output = new RecordWriter<StringRecord>(this);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ForwardTask1Input2Outputs.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ForwardTask1Input2Outputs.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ForwardTask1Input2Outputs.java
deleted file mode 100644
index 162da14..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ForwardTask1Input2Outputs.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.executiongraph;
-
-import org.apache.flink.core.io.StringRecord;
-import org.apache.flink.runtime.io.network.api.RecordReader;
-import org.apache.flink.runtime.io.network.api.RecordWriter;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-
-public class ForwardTask1Input2Outputs extends AbstractInvokable {
-
-	private RecordReader<StringRecord> input = null;
-
-	private RecordWriter<StringRecord> output1 = null;
-
-	private RecordWriter<StringRecord> output2 = null;
-
-	@Override
-	public void invoke() throws Exception {
-
-		this.output1.initializeSerializers();
-		this.output2.initializeSerializers();
-
-		while (this.input.hasNext()) {
-
-			StringRecord s = input.next();
-			this.output1.emit(s);
-			this.output2.emit(s);
-		}
-
-		this.output1.flush();
-		this.output2.flush();
-	}
-
-	@Override
-	public void registerInputOutput() {
-		this.input = new RecordReader<StringRecord>(this, StringRecord.class);
-		this.output1 = new RecordWriter<StringRecord>(this);
-		this.output2 = new RecordWriter<StringRecord>(this);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ForwardTask2Inputs1Output.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ForwardTask2Inputs1Output.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ForwardTask2Inputs1Output.java
deleted file mode 100644
index 8035d92..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ForwardTask2Inputs1Output.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.executiongraph;
-
-import org.apache.flink.core.io.StringRecord;
-import org.apache.flink.runtime.io.network.api.RecordReader;
-import org.apache.flink.runtime.io.network.api.RecordWriter;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-
-public class ForwardTask2Inputs1Output extends AbstractInvokable {
-
-	private RecordReader<StringRecord> input1 = null;
-
-	private RecordReader<StringRecord> input2 = null;
-
-	private RecordWriter<StringRecord> output = null;
-
-	@Override
-	public void invoke() throws Exception {
-		this.output.initializeSerializers();
-
-		while (this.input1.hasNext()) {
-
-			StringRecord s = input1.next();
-			this.output.emit(s);
-		}
-
-		while (this.input2.hasNext()) {
-
-			try {
-				StringRecord s = input2.next();
-				this.output.emit(s);
-			} catch (InterruptedException e) {
-				e.printStackTrace();
-			}
-		}
-
-		this.output.flush();
-	}
-
-	@Override
-	public void registerInputOutput() {
-		this.input1 = new RecordReader<StringRecord>(this, StringRecord.class);
-		this.input2 = new RecordReader<StringRecord>(this, StringRecord.class);
-		this.output = new RecordWriter<StringRecord>(this);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
new file mode 100644
index 0000000..0ea00d3
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobID;
+
+
+public class PointwisePatternTest {
+
+	private final JobID jobId = new JobID();
+	private final String jobName = "Test Job Sample Name";
+	private final Configuration cfg = new Configuration();
+	
+	@Test
+	public void testNToN() {
+		final int N = 23;
+		
+		AbstractJobVertex v1 = new AbstractJobVertex("vertex1");
+		AbstractJobVertex v2 = new AbstractJobVertex("vertex2");
+	
+		v1.setParallelism(N);
+		v2.setParallelism(N);
+	
+		v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE);
+	
+		List<AbstractJobVertex> ordered = new ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2));
+
+		ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg);
+		try {
+			eg.attachJobGraph(ordered);
+		}
+		catch (JobException e) {
+			e.printStackTrace();
+			fail("Job failed with exception: " + e.getMessage());
+		}
+		
+		ExecutionJobVertex target = eg.getAllVertices().get(v2.getID());
+		
+		for (ExecutionVertex2 ev : target.getTaskVertices()) {
+			assertEquals(1, ev.getNumberOfInputs());
+			
+			ExecutionEdge2[] inEdges = ev.getInputEdges(0);
+			assertEquals(1, inEdges.length);
+			
+			assertEquals(ev.getParallelSubtaskIndex(), inEdges[0].getSource().getPartition());
+		}
+	}
+	
+	@Test
+	public void test2NToN() {
+		final int N = 17;
+		
+		AbstractJobVertex v1 = new AbstractJobVertex("vertex1");
+		AbstractJobVertex v2 = new AbstractJobVertex("vertex2");
+	
+		v1.setParallelism(2 * N);
+		v2.setParallelism(N);
+	
+		v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE);
+	
+		List<AbstractJobVertex> ordered = new ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2));
+
+		ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg);
+		try {
+			eg.attachJobGraph(ordered);
+		}
+		catch (JobException e) {
+			e.printStackTrace();
+			fail("Job failed with exception: " + e.getMessage());
+		}
+		
+		ExecutionJobVertex target = eg.getAllVertices().get(v2.getID());
+		
+		for (ExecutionVertex2 ev : target.getTaskVertices()) {
+			assertEquals(1, ev.getNumberOfInputs());
+			
+			ExecutionEdge2[] inEdges = ev.getInputEdges(0);
+			assertEquals(2, inEdges.length);
+			
+			assertEquals(ev.getParallelSubtaskIndex() * 2, inEdges[0].getSource().getPartition());
+			assertEquals(ev.getParallelSubtaskIndex() * 2 + 1, inEdges[1].getSource().getPartition());
+		}
+	}
+	
+	@Test
+	public void test3NToN() {
+		final int N = 17;
+		
+		AbstractJobVertex v1 = new AbstractJobVertex("vertex1");
+		AbstractJobVertex v2 = new AbstractJobVertex("vertex2");
+	
+		v1.setParallelism(3 * N);
+		v2.setParallelism(N);
+	
+		v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE);
+	
+		List<AbstractJobVertex> ordered = new ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2));
+
+		ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg);
+		try {
+			eg.attachJobGraph(ordered);
+		}
+		catch (JobException e) {
+			e.printStackTrace();
+			fail("Job failed with exception: " + e.getMessage());
+		}
+		
+		ExecutionJobVertex target = eg.getAllVertices().get(v2.getID());
+		
+		for (ExecutionVertex2 ev : target.getTaskVertices()) {
+			assertEquals(1, ev.getNumberOfInputs());
+			
+			ExecutionEdge2[] inEdges = ev.getInputEdges(0);
+			assertEquals(3, inEdges.length);
+			
+			assertEquals(ev.getParallelSubtaskIndex() * 3, inEdges[0].getSource().getPartition());
+			assertEquals(ev.getParallelSubtaskIndex() * 3 + 1, inEdges[1].getSource().getPartition());
+			assertEquals(ev.getParallelSubtaskIndex() * 3 + 2, inEdges[2].getSource().getPartition());
+		}
+	}
+	
+	@Test
+	public void testNTo2N() {
+		final int N = 41;
+		
+		AbstractJobVertex v1 = new AbstractJobVertex("vertex1");
+		AbstractJobVertex v2 = new AbstractJobVertex("vertex2");
+	
+		v1.setParallelism(N);
+		v2.setParallelism(2 * N);
+	
+		v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE);
+	
+		List<AbstractJobVertex> ordered = new ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2));
+
+		ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg);
+		try {
+			eg.attachJobGraph(ordered);
+		}
+		catch (JobException e) {
+			e.printStackTrace();
+			fail("Job failed with exception: " + e.getMessage());
+		}
+		
+		ExecutionJobVertex target = eg.getAllVertices().get(v2.getID());
+		
+		for (ExecutionVertex2 ev : target.getTaskVertices()) {
+			assertEquals(1, ev.getNumberOfInputs());
+			
+			ExecutionEdge2[] inEdges = ev.getInputEdges(0);
+			assertEquals(1, inEdges.length);
+			
+			assertEquals(ev.getParallelSubtaskIndex() / 2, inEdges[0].getSource().getPartition());
+		}
+	}
+	
+	@Test
+	public void testNTo7N() {
+		final int N = 11;
+		
+		AbstractJobVertex v1 = new AbstractJobVertex("vertex1");
+		AbstractJobVertex v2 = new AbstractJobVertex("vertex2");
+	
+		v1.setParallelism(N);
+		v2.setParallelism(7 * N);
+	
+		v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE);
+	
+		List<AbstractJobVertex> ordered = new ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2));
+
+		ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg);
+		try {
+			eg.attachJobGraph(ordered);
+		}
+		catch (JobException e) {
+			e.printStackTrace();
+			fail("Job failed with exception: " + e.getMessage());
+		}
+		
+		ExecutionJobVertex target = eg.getAllVertices().get(v2.getID());
+		
+		for (ExecutionVertex2 ev : target.getTaskVertices()) {
+			assertEquals(1, ev.getNumberOfInputs());
+			
+			ExecutionEdge2[] inEdges = ev.getInputEdges(0);
+			assertEquals(1, inEdges.length);
+			
+			assertEquals(ev.getParallelSubtaskIndex() / 7, inEdges[0].getSource().getPartition());
+		}
+	}
+	
+	@Test
+	public void testLowHighIrregular() {
+		testLowToHigh(3, 16);
+		testLowToHigh(19, 21);
+		testLowToHigh(15, 20);
+		testLowToHigh(11, 31);
+	}
+	
+	@Test
+	public void testHighLowIrregular() {
+		testHighToLow(16, 3);
+		testHighToLow(21, 19);
+		testHighToLow(20, 15);
+		testHighToLow(31, 11);
+	}
+	
+	private void testLowToHigh(int lowDop, int highDop) {
+		if (highDop < lowDop) {
+			throw new IllegalArgumentException();
+		}
+		
+		final int factor = highDop / lowDop;
+		final int delta = highDop % lowDop == 0 ? 0 : 1;
+		
+		AbstractJobVertex v1 = new AbstractJobVertex("vertex1");
+		AbstractJobVertex v2 = new AbstractJobVertex("vertex2");
+	
+		v1.setParallelism(lowDop);
+		v2.setParallelism(highDop);
+	
+		v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE);
+	
+		List<AbstractJobVertex> ordered = new ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2));
+
+		ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg);
+		try {
+			eg.attachJobGraph(ordered);
+		}
+		catch (JobException e) {
+			e.printStackTrace();
+			fail("Job failed with exception: " + e.getMessage());
+		}
+		
+		ExecutionJobVertex target = eg.getAllVertices().get(v2.getID());
+		
+		int[] timesUsed = new int[lowDop];
+		
+		for (ExecutionVertex2 ev : target.getTaskVertices()) {
+			assertEquals(1, ev.getNumberOfInputs());
+			
+			ExecutionEdge2[] inEdges = ev.getInputEdges(0);
+			assertEquals(1, inEdges.length);
+			
+			
+			timesUsed[inEdges[0].getSource().getPartition()]++;
+		}
+		
+		for (int i = 0; i < timesUsed.length; i++) {
+			assertTrue(timesUsed[i] >= factor && timesUsed[i] <= factor + delta);
+		}
+	}
+	
+	private void testHighToLow(int highDop, int lowDop) {
+		if (highDop < lowDop) {
+			throw new IllegalArgumentException();
+		}
+		
+		final int factor = highDop / lowDop;
+		final int delta = highDop % lowDop == 0 ? 0 : 1;
+		
+		AbstractJobVertex v1 = new AbstractJobVertex("vertex1");
+		AbstractJobVertex v2 = new AbstractJobVertex("vertex2");
+	
+		v1.setParallelism(highDop);
+		v2.setParallelism(lowDop);
+	
+		v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE);
+	
+		List<AbstractJobVertex> ordered = new ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2));
+
+		ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg);
+		try {
+			eg.attachJobGraph(ordered);
+		}
+		catch (JobException e) {
+			e.printStackTrace();
+			fail("Job failed with exception: " + e.getMessage());
+		}
+		
+		ExecutionJobVertex target = eg.getAllVertices().get(v2.getID());
+		
+		int[] timesUsed = new int[highDop];
+		
+		for (ExecutionVertex2 ev : target.getTaskVertices()) {
+			assertEquals(1, ev.getNumberOfInputs());
+			
+			ExecutionEdge2[] inEdges = ev.getInputEdges(0);
+			assertTrue(inEdges.length >= factor && inEdges.length <= factor + delta);
+			
+			for (ExecutionEdge2 ee : inEdges) {
+				timesUsed[ee.getSource().getPartition()]++;
+			}
+		}
+		
+		for (int i = 0; i < timesUsed.length; i++) {
+			assertEquals(1, timesUsed[i]);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/SelfCrossForwardTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/SelfCrossForwardTask.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/SelfCrossForwardTask.java
deleted file mode 100644
index 7fafeda..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/SelfCrossForwardTask.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph;
-
-import org.apache.flink.core.io.StringRecord;
-import org.apache.flink.runtime.io.network.api.RecordReader;
-import org.apache.flink.runtime.io.network.api.RecordWriter;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-
-/**
- * This class represents the cross task in the self cross unit test.
- */
-public class SelfCrossForwardTask extends AbstractInvokable {
-
-	@Override
-	public void registerInputOutput() {
-		new RecordReader<StringRecord>(this, StringRecord.class);
-		new RecordReader<StringRecord>(this, StringRecord.class);
-		new RecordWriter<StringRecord>(this);
-	}
-
-	@Override
-	public void invoke() {}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
new file mode 100644
index 0000000..540d9fd
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.junit.Test;
+
+public class VertexSlotSharingTest {
+
+	/*
+	 * Test setup:
+	 * - v1 is isolated, no slot sharing
+	 * - v2 and v3 (not connected) share slots
+	 * - v4 and v5 (connected) share slots
+	 */
+	@Test
+	public void testAssignSlotSharingGroup() {
+		try {
+			AbstractJobVertex v1 = new AbstractJobVertex("v1");
+			AbstractJobVertex v2 = new AbstractJobVertex("v2");
+			AbstractJobVertex v3 = new AbstractJobVertex("v3");
+			AbstractJobVertex v4 = new AbstractJobVertex("v4");
+			AbstractJobVertex v5 = new AbstractJobVertex("v5");
+			
+			v1.setParallelism(4);
+			v2.setParallelism(5);
+			v3.setParallelism(7);
+			v4.setParallelism(1);
+			v5.setParallelism(11);
+			
+			v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE);
+			v5.connectNewDataSetAsInput(v4, DistributionPattern.POINTWISE);
+			
+			SlotSharingGroup jg1 = new SlotSharingGroup();
+			v2.setSlotSharingGroup(jg1);
+			v3.setSlotSharingGroup(jg1);
+			
+			SlotSharingGroup jg2 = new SlotSharingGroup();
+			v4.setSlotSharingGroup(jg2);
+			v5.setSlotSharingGroup(jg2);
+			
+			List<AbstractJobVertex> vertices = new ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
+			
+			ExecutionGraph eg = new ExecutionGraph(new JobID(), "test job", new Configuration());
+			eg.attachJobGraph(vertices);
+			
+			// verify that the vertices are all in the same slot sharing group
+			SlotSharingGroup group1 = null;
+			SlotSharingGroup group2 = null;
+			
+			// verify that v1 tasks have no slot sharing group
+			assertNull(eg.getJobVertex(v1.getID()).getSlotSharingGroup());
+			
+			// v2 and v3 are shared
+			group1 = eg.getJobVertex(v2.getID()).getSlotSharingGroup();
+			assertNotNull(group1);
+			assertEquals(group1, eg.getJobVertex(v3.getID()).getSlotSharingGroup());
+			
+			assertEquals(2, group1.getJobVertexIds().size());
+			assertTrue(group1.getJobVertexIds().contains(v2.getID()));
+			assertTrue(group1.getJobVertexIds().contains(v3.getID()));
+			
+			// v4 and v5 are shared
+			group2 = eg.getJobVertex(v4.getID()).getSlotSharingGroup();
+			assertNotNull(group2);
+			assertEquals(group2, eg.getJobVertex(v5.getID()).getSlotSharingGroup());
+			
+			assertEquals(2, group1.getJobVertexIds().size());
+			assertTrue(group2.getJobVertexIds().contains(v4.getID()));
+			assertTrue(group2.getJobVertexIds().contains(v5.getID()));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotTest.java
new file mode 100644
index 0000000..c10a5ef
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotTest.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import static org.mockito.Mockito.*;
+import static org.junit.Assert.*;
+
+import java.net.InetAddress;
+
+import org.apache.flink.runtime.executiongraph.ExecutionVertex2;
+import org.apache.flink.runtime.jobgraph.JobID;
+import org.junit.Test;
+import org.mockito.Matchers;
+
+public class AllocatedSlotTest {
+
+	@Test
+	public void testStateTransitions() {
+		try {
+			// cancel, then release
+			{
+				AllocatedSlot slot = getSlot();
+				assertTrue(slot.isAlive());
+				
+				slot.cancel();
+				assertFalse(slot.isAlive());
+				assertTrue(slot.isCanceled());
+				assertFalse(slot.isReleased());
+				
+				slot.releaseSlot();
+				assertFalse(slot.isAlive());
+				assertTrue(slot.isCanceled());
+				assertTrue(slot.isReleased());
+			}
+			
+			// release immediately
+			{
+				AllocatedSlot slot = getSlot();
+				assertTrue(slot.isAlive());
+				
+				slot.releaseSlot();
+				assertFalse(slot.isAlive());
+				assertTrue(slot.isCanceled());
+				assertTrue(slot.isReleased());
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testSetExecutionVertex() {
+		try {
+			ExecutionVertex2 ev = mock(ExecutionVertex2.class);
+			ExecutionVertex2 ev_2 = mock(ExecutionVertex2.class);
+			
+			// assign to alive slot
+			{
+				AllocatedSlot slot = getSlot();
+				
+				assertTrue(slot.setExecutedVertex(ev));
+				assertEquals(ev, slot.getExecutedVertex());
+				
+				// try to add another one
+				assertFalse(slot.setExecutedVertex(ev_2));
+				assertEquals(ev, slot.getExecutedVertex());
+			}
+			
+			// assign to canceled slot
+			{
+				AllocatedSlot slot = getSlot();
+				slot.cancel();
+				
+				assertFalse(slot.setExecutedVertex(ev));
+				assertNull(slot.getExecutedVertex());
+			}
+			
+			// assign to released
+			{
+				AllocatedSlot slot = getSlot();
+				slot.releaseSlot();
+				
+				assertFalse(slot.setExecutedVertex(ev));
+				assertNull(slot.getExecutedVertex());
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testReleaseCancelsVertex() {
+		try {
+			ExecutionVertex2 ev = mock(ExecutionVertex2.class);
+			
+			AllocatedSlot slot = getSlot();
+			assertTrue(slot.setExecutedVertex(ev));
+			assertEquals(ev, slot.getExecutedVertex());
+			
+			slot.cancel();
+			slot.releaseSlot();
+			slot.cancel();
+			
+			verify(ev, times(1)).fail(Matchers.any(Throwable.class));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	public static AllocatedSlot getSlot() throws Exception {
+		HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024);
+		InetAddress address = InetAddress.getByName("127.0.0.1");
+		InstanceConnectionInfo connection = new InstanceConnectionInfo(address, 10000, 10001);
+		
+		Instance instance = new Instance(connection, new InstanceID(), hardwareDescription, 1);
+		return instance.allocateSlot(new JobID());
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DefaultInstanceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DefaultInstanceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DefaultInstanceManagerTest.java
deleted file mode 100644
index 0bec1f3..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DefaultInstanceManagerTest.java
+++ /dev/null
@@ -1,275 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.net.InetAddress;
-
-import org.apache.flink.runtime.testutils.CommonTestUtils;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Tests for {@link org.apache.flink.runtime.instance.DefaultInstanceManager}.
- */
-public class DefaultInstanceManagerTest {
-	
-	@Test
-	public void testInstanceRegistering() {
-		try {
-			DefaultInstanceManager cm = new DefaultInstanceManager();
-			
-			final int ipcPort = 10000;
-			final int dataPort = 20000;
-
-			HardwareDescription hardwareDescription = HardwareDescription.extractFromSystem(4096);
-
-			InetAddress address = InetAddress.getByName("127.0.0.1");
-			
-			// register three instances
-			InstanceConnectionInfo ici1 = new InstanceConnectionInfo(address, ipcPort + 0, dataPort + 0);
-			InstanceConnectionInfo ici2 = new InstanceConnectionInfo(address, ipcPort + 15, dataPort + 15);
-			InstanceConnectionInfo ici3 = new InstanceConnectionInfo(address, ipcPort + 30, dataPort + 30);
-			
-			InstanceID i1 = cm.registerTaskManager(ici1, hardwareDescription, 1);
-			InstanceID i2 = cm.registerTaskManager(ici2, hardwareDescription, 2);
-			InstanceID i3 = cm.registerTaskManager(ici3, hardwareDescription, 5);
-			
-			assertEquals(3, cm.getNumberOfRegisteredTaskManagers());
-			assertEquals(8, cm.getTotalNumberOfSlots());
-			
-			assertEquals(ici1, cm.getAllRegisteredInstances().get(i1).getInstanceConnectionInfo());
-			assertEquals(ici2, cm.getAllRegisteredInstances().get(i2).getInstanceConnectionInfo());
-			assertEquals(ici3, cm.getAllRegisteredInstances().get(i3).getInstanceConnectionInfo());
-
-			cm.shutdown();
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			Assert.fail("Test erroneous: " + e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testRegisteringAlreadyRegistered() {
-		try {
-			DefaultInstanceManager cm = new DefaultInstanceManager();
-			
-			final int ipcPort = 10000;
-			final int dataPort = 20000;
-
-			HardwareDescription resources = HardwareDescription.extractFromSystem(4096);
-			InetAddress address = InetAddress.getByName("127.0.0.1");
-			InstanceConnectionInfo ici = new InstanceConnectionInfo(address, ipcPort + 0, dataPort + 0);
-			
-			InstanceID i = cm.registerTaskManager(ici, resources, 1);
-
-			assertNotNull(i);
-			assertEquals(1, cm.getNumberOfRegisteredTaskManagers());
-			assertEquals(1, cm.getTotalNumberOfSlots());
-			
-			InstanceID next = cm.registerTaskManager(ici, resources, 1);
-			assertNull(next);
-			
-			assertEquals(1, cm.getNumberOfRegisteredTaskManagers());
-			assertEquals(1, cm.getTotalNumberOfSlots());
-
-			cm.shutdown();
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			Assert.fail("Test erroneous: " + e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testReportHeartbeat() {
-		try {
-			DefaultInstanceManager cm = new DefaultInstanceManager();
-			
-			final int ipcPort = 10000;
-			final int dataPort = 20000;
-
-			HardwareDescription hardwareDescription = HardwareDescription.extractFromSystem(4096);
-
-			InetAddress address = InetAddress.getByName("127.0.0.1");
-			
-			// register three instances
-			InstanceConnectionInfo ici1 = new InstanceConnectionInfo(address, ipcPort + 0, dataPort + 0);
-			InstanceConnectionInfo ici2 = new InstanceConnectionInfo(address, ipcPort + 1, dataPort + 1);
-			InstanceConnectionInfo ici3 = new InstanceConnectionInfo(address, ipcPort + 2, dataPort + 2);
-			
-			InstanceID i1 = cm.registerTaskManager(ici1, hardwareDescription, 1);
-			InstanceID i2 = cm.registerTaskManager(ici2, hardwareDescription, 1);
-			InstanceID i3 = cm.registerTaskManager(ici3, hardwareDescription, 1);
-
-			// report some immediate heart beats
-			assertTrue(cm.reportHeartBeat(i1));
-			assertTrue(cm.reportHeartBeat(i2));
-			assertTrue(cm.reportHeartBeat(i3));
-			
-			// report heart beat for non-existing instance
-			assertFalse(cm.reportHeartBeat(new InstanceID()));
-			
-			final long WAIT = 200;
-			CommonTestUtils.sleepUninterruptibly(WAIT);
-			
-			long h1 = cm.getAllRegisteredInstances().get(i1).getLastHeartBeat();
-			long h2 = cm.getAllRegisteredInstances().get(i2).getLastHeartBeat();
-			long h3 = cm.getAllRegisteredInstances().get(i3).getLastHeartBeat();
-
-			// send one heart beat again and verify that the
-			assertTrue(cm.reportHeartBeat(i1));
-			long newH1 = cm.getAllRegisteredInstances().get(i1).getLastHeartBeat();
-			
-			long now = System.currentTimeMillis();
-			
-			assertTrue(now - h1 >= WAIT);
-			assertTrue(now - h2 >= WAIT);
-			assertTrue(now - h3 >= WAIT);
-			assertTrue(now - newH1 <= WAIT);
-			
-			cm.shutdown();
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			Assert.fail("Test erroneous: " + e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testShutdown() {
-		try {
-			DefaultInstanceManager cm = new DefaultInstanceManager();
-			cm.shutdown();
-			
-			try {
-				HardwareDescription resources = HardwareDescription.extractFromSystem(4096);
-				InetAddress address = InetAddress.getByName("127.0.0.1");
-				InstanceConnectionInfo ici = new InstanceConnectionInfo(address, 10000, 20000);
-		
-				cm.registerTaskManager(ici, resources, 1);
-				fail("Should raise exception in shutdown state");
-			}
-			catch (IllegalStateException e) {
-				// expected
-			}
-			
-			try {
-				cm.reportHeartBeat(new InstanceID());
-				fail("Should raise exception in shutdown state");
-			}
-			catch (IllegalStateException e) {
-				// expected
-			}
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			Assert.fail("Test erroneous: " + e.getMessage());
-		}
-	}
-
-	/**
-	 * This test checks the clean-up routines of the cluster manager.
-	 */
-	@Test
-	public void testCleanUp() {
-		try {
-			DefaultInstanceManager cm = new DefaultInstanceManager(200, 100);
-
-			HardwareDescription resources = HardwareDescription.extractFromSystem(4096);
-			InetAddress address = InetAddress.getByName("127.0.0.1");
-			InstanceConnectionInfo ici1 = new InstanceConnectionInfo(address, 10000, 20000);
-			InstanceConnectionInfo ici2 = new InstanceConnectionInfo(address, 10001, 20001);
-
-			// register three instances
-			InstanceID i1 = cm.registerTaskManager(ici1, resources, 1);
-			InstanceID i2 = cm.registerTaskManager(ici2, resources, 1);
-
-			assertNotNull(i1);
-			assertNotNull(i2);
-			
-			assertEquals(2, cm.getNumberOfRegisteredTaskManagers());
-			assertEquals(2, cm.getTotalNumberOfSlots());
-
-			// report a few heatbeats for both of the machines (each 50 msecs)...
-			for (int i = 0; i < 8; i++) {
-				CommonTestUtils.sleepUninterruptibly(50);
-				
-				assertTrue(cm.reportHeartBeat(i1));
-				assertTrue(cm.reportHeartBeat(i2));
-			}
-			
-			// all should be alive
-			assertEquals(2, cm.getNumberOfRegisteredTaskManagers());
-			assertEquals(2, cm.getTotalNumberOfSlots());
-
-			// report a few heatbeats for both only one machine
-			for (int i = 0; i < 8; i++) {
-				CommonTestUtils.sleepUninterruptibly(50);
-				
-				assertTrue(cm.reportHeartBeat(i1));
-			}
-			
-			// we should have lost one TM by now
-			assertEquals(1, cm.getNumberOfRegisteredTaskManagers());
-			assertEquals(1, cm.getTotalNumberOfSlots());
-			
-			// if the lost TM reports, it should not be accepted
-			assertFalse(cm.reportHeartBeat(i2));
-			
-			// allow the lost TM to re-register itself
-			i2 = cm.registerTaskManager(ici2, resources, 1);
-			assertEquals(2, cm.getNumberOfRegisteredTaskManagers());
-			assertEquals(2, cm.getTotalNumberOfSlots());
-			
-			// report a few heatbeats for both of the machines (each 50 msecs)...
-			for (int i = 0; i < 8; i++) {
-				CommonTestUtils.sleepUninterruptibly(50);
-				
-				assertTrue(cm.reportHeartBeat(i1));
-				assertTrue(cm.reportHeartBeat(i2));
-			}
-			
-			// all should be alive
-			assertEquals(2, cm.getNumberOfRegisteredTaskManagers());
-			assertEquals(2, cm.getTotalNumberOfSlots());
-
-			
-			cm.shutdown();
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			Assert.fail("Test erroneous: " + e.getMessage());
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
new file mode 100644
index 0000000..e15b461
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
@@ -0,0 +1,276 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.net.InetAddress;
+
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests for {@link org.apache.flink.runtime.instance.InstanceManager}.
+ */
+public class InstanceManagerTest {
+	
+	
+	@Test
+	public void testInstanceRegistering() {
+		try {
+			InstanceManager cm = new InstanceManager();
+			
+			final int ipcPort = 10000;
+			final int dataPort = 20000;
+
+			HardwareDescription hardwareDescription = HardwareDescription.extractFromSystem(4096);
+
+			InetAddress address = InetAddress.getByName("127.0.0.1");
+			
+			// register three instances
+			InstanceConnectionInfo ici1 = new InstanceConnectionInfo(address, ipcPort + 0, dataPort + 0);
+			InstanceConnectionInfo ici2 = new InstanceConnectionInfo(address, ipcPort + 15, dataPort + 15);
+			InstanceConnectionInfo ici3 = new InstanceConnectionInfo(address, ipcPort + 30, dataPort + 30);
+			
+			InstanceID i1 = cm.registerTaskManager(ici1, hardwareDescription, 1);
+			InstanceID i2 = cm.registerTaskManager(ici2, hardwareDescription, 2);
+			InstanceID i3 = cm.registerTaskManager(ici3, hardwareDescription, 5);
+			
+			assertEquals(3, cm.getNumberOfRegisteredTaskManagers());
+			assertEquals(8, cm.getTotalNumberOfSlots());
+			
+			assertEquals(ici1, cm.getAllRegisteredInstances().get(i1).getInstanceConnectionInfo());
+			assertEquals(ici2, cm.getAllRegisteredInstances().get(i2).getInstanceConnectionInfo());
+			assertEquals(ici3, cm.getAllRegisteredInstances().get(i3).getInstanceConnectionInfo());
+
+			cm.shutdown();
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			Assert.fail("Test erroneous: " + e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testRegisteringAlreadyRegistered() {
+		try {
+			InstanceManager cm = new InstanceManager();
+			
+			final int ipcPort = 10000;
+			final int dataPort = 20000;
+
+			HardwareDescription resources = HardwareDescription.extractFromSystem(4096);
+			InetAddress address = InetAddress.getByName("127.0.0.1");
+			InstanceConnectionInfo ici = new InstanceConnectionInfo(address, ipcPort + 0, dataPort + 0);
+			
+			InstanceID i = cm.registerTaskManager(ici, resources, 1);
+
+			assertNotNull(i);
+			assertEquals(1, cm.getNumberOfRegisteredTaskManagers());
+			assertEquals(1, cm.getTotalNumberOfSlots());
+			
+			InstanceID next = cm.registerTaskManager(ici, resources, 1);
+			assertNull(next);
+			
+			assertEquals(1, cm.getNumberOfRegisteredTaskManagers());
+			assertEquals(1, cm.getTotalNumberOfSlots());
+
+			cm.shutdown();
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			Assert.fail("Test erroneous: " + e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testReportHeartbeat() {
+		try {
+			InstanceManager cm = new InstanceManager();
+			
+			final int ipcPort = 10000;
+			final int dataPort = 20000;
+
+			HardwareDescription hardwareDescription = HardwareDescription.extractFromSystem(4096);
+
+			InetAddress address = InetAddress.getByName("127.0.0.1");
+			
+			// register three instances
+			InstanceConnectionInfo ici1 = new InstanceConnectionInfo(address, ipcPort + 0, dataPort + 0);
+			InstanceConnectionInfo ici2 = new InstanceConnectionInfo(address, ipcPort + 1, dataPort + 1);
+			InstanceConnectionInfo ici3 = new InstanceConnectionInfo(address, ipcPort + 2, dataPort + 2);
+			
+			InstanceID i1 = cm.registerTaskManager(ici1, hardwareDescription, 1);
+			InstanceID i2 = cm.registerTaskManager(ici2, hardwareDescription, 1);
+			InstanceID i3 = cm.registerTaskManager(ici3, hardwareDescription, 1);
+
+			// report some immediate heart beats
+			assertTrue(cm.reportHeartBeat(i1));
+			assertTrue(cm.reportHeartBeat(i2));
+			assertTrue(cm.reportHeartBeat(i3));
+			
+			// report heart beat for non-existing instance
+			assertFalse(cm.reportHeartBeat(new InstanceID()));
+			
+			final long WAIT = 200;
+			CommonTestUtils.sleepUninterruptibly(WAIT);
+			
+			long h1 = cm.getAllRegisteredInstances().get(i1).getLastHeartBeat();
+			long h2 = cm.getAllRegisteredInstances().get(i2).getLastHeartBeat();
+			long h3 = cm.getAllRegisteredInstances().get(i3).getLastHeartBeat();
+
+			// send one heart beat again and verify that the
+			assertTrue(cm.reportHeartBeat(i1));
+			long newH1 = cm.getAllRegisteredInstances().get(i1).getLastHeartBeat();
+			
+			long now = System.currentTimeMillis();
+			
+			assertTrue(now - h1 >= WAIT);
+			assertTrue(now - h2 >= WAIT);
+			assertTrue(now - h3 >= WAIT);
+			assertTrue(now - newH1 <= WAIT);
+			
+			cm.shutdown();
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			Assert.fail("Test erroneous: " + e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testShutdown() {
+		try {
+			InstanceManager cm = new InstanceManager();
+			cm.shutdown();
+			
+			try {
+				HardwareDescription resources = HardwareDescription.extractFromSystem(4096);
+				InetAddress address = InetAddress.getByName("127.0.0.1");
+				InstanceConnectionInfo ici = new InstanceConnectionInfo(address, 10000, 20000);
+		
+				cm.registerTaskManager(ici, resources, 1);
+				fail("Should raise exception in shutdown state");
+			}
+			catch (IllegalStateException e) {
+				// expected
+			}
+			
+			try {
+				cm.reportHeartBeat(new InstanceID());
+				fail("Should raise exception in shutdown state");
+			}
+			catch (IllegalStateException e) {
+				// expected
+			}
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			Assert.fail("Test erroneous: " + e.getMessage());
+		}
+	}
+
+	/**
+	 * This test checks the clean-up routines of the cluster manager.
+	 */
+	@Test
+	public void testCleanUp() {
+		try {
+			InstanceManager cm = new InstanceManager(200, 100);
+
+			HardwareDescription resources = HardwareDescription.extractFromSystem(4096);
+			InetAddress address = InetAddress.getByName("127.0.0.1");
+			InstanceConnectionInfo ici1 = new InstanceConnectionInfo(address, 10000, 20000);
+			InstanceConnectionInfo ici2 = new InstanceConnectionInfo(address, 10001, 20001);
+
+			// register three instances
+			InstanceID i1 = cm.registerTaskManager(ici1, resources, 1);
+			InstanceID i2 = cm.registerTaskManager(ici2, resources, 1);
+
+			assertNotNull(i1);
+			assertNotNull(i2);
+			
+			assertEquals(2, cm.getNumberOfRegisteredTaskManagers());
+			assertEquals(2, cm.getTotalNumberOfSlots());
+
+			// report a few heatbeats for both of the machines (each 50 msecs)...
+			for (int i = 0; i < 8; i++) {
+				CommonTestUtils.sleepUninterruptibly(50);
+				
+				assertTrue(cm.reportHeartBeat(i1));
+				assertTrue(cm.reportHeartBeat(i2));
+			}
+			
+			// all should be alive
+			assertEquals(2, cm.getNumberOfRegisteredTaskManagers());
+			assertEquals(2, cm.getTotalNumberOfSlots());
+
+			// report a few heatbeats for both only one machine
+			for (int i = 0; i < 8; i++) {
+				CommonTestUtils.sleepUninterruptibly(50);
+				
+				assertTrue(cm.reportHeartBeat(i1));
+			}
+			
+			// we should have lost one TM by now
+			assertEquals(1, cm.getNumberOfRegisteredTaskManagers());
+			assertEquals(1, cm.getTotalNumberOfSlots());
+			
+			// if the lost TM reports, it should not be accepted
+			assertFalse(cm.reportHeartBeat(i2));
+			
+			// allow the lost TM to re-register itself
+			i2 = cm.registerTaskManager(ici2, resources, 1);
+			assertEquals(2, cm.getNumberOfRegisteredTaskManagers());
+			assertEquals(2, cm.getTotalNumberOfSlots());
+			
+			// report a few heatbeats for both of the machines (each 50 msecs)...
+			for (int i = 0; i < 8; i++) {
+				CommonTestUtils.sleepUninterruptibly(50);
+				
+				assertTrue(cm.reportHeartBeat(i1));
+				assertTrue(cm.reportHeartBeat(i2));
+			}
+			
+			// all should be alive
+			assertEquals(2, cm.getNumberOfRegisteredTaskManagers());
+			assertEquals(2, cm.getTotalNumberOfSlots());
+
+			
+			cm.shutdown();
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			Assert.fail("Test erroneous: " + e.getMessage());
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/instance/LocalInstanceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/LocalInstanceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/LocalInstanceManagerTest.java
index bce692d..09da1a5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/LocalInstanceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/LocalInstanceManagerTest.java
@@ -25,6 +25,7 @@ import java.util.List;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.runtime.ExecutionMode;
 import org.apache.flink.runtime.accumulators.AccumulatorEvent;
 import org.apache.flink.runtime.client.JobCancelResult;
@@ -34,8 +35,6 @@ import org.apache.flink.runtime.event.job.AbstractEvent;
 import org.apache.flink.runtime.event.job.RecentJobEvent;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertexID;
-import org.apache.flink.runtime.executiongraph.InternalJobStatus;
 import org.apache.flink.runtime.executiongraph.JobStatusListener;
 import org.apache.flink.runtime.io.network.ConnectionInfoLookupResponse;
 import org.apache.flink.runtime.io.network.channels.ChannelID;
@@ -43,10 +42,10 @@ import org.apache.flink.runtime.ipc.RPC;
 import org.apache.flink.runtime.ipc.RPC.Server;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.DeploymentManager;
-import org.apache.flink.runtime.jobmanager.splitassigner.InputSplitWrapper;
 import org.apache.flink.runtime.managementgraph.ManagementGraph;
-import org.apache.flink.runtime.managementgraph.ManagementVertexID;
 import org.apache.flink.runtime.protocols.AccumulatorProtocol;
 import org.apache.flink.runtime.protocols.ChannelLookupProtocol;
 import org.apache.flink.runtime.protocols.ExtendedManagementProtocol;
@@ -204,7 +203,7 @@ public class LocalInstanceManagerTest {
 		}
 
 		@Override
-		public void jobStatusHasChanged(ExecutionGraph executionGraph, InternalJobStatus newJobStatus, String optionalMessage) {}
+		public void jobStatusHasChanged(ExecutionGraph executionGraph, JobStatus newJobStatus, String optionalMessage) {}
 
 		@Override
 		public ConnectionInfoLookupResponse lookupConnectionInfo(InstanceConnectionInfo caller, JobID jobID, ChannelID sourceChannelID) {
@@ -215,11 +214,6 @@ public class LocalInstanceManagerTest {
 		public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {}
 
 		@Override
-		public InputSplitWrapper requestNextInputSplit(JobID jobID, ExecutionVertexID vertexID, IntegerRecord sequenceNumber) {
-			return null;
-		}
-
-		@Override
 		public ManagementGraph getManagementGraph(JobID jobID) {
 			return null;
 		}
@@ -235,12 +229,6 @@ public class LocalInstanceManagerTest {
 		}
 
 		@Override
-		public void killTask(JobID jobID, ManagementVertexID id) {}
-
-		@Override
-		public void logBufferUtilization(JobID jobID) {}
-
-		@Override
 		public int getAvailableSlots() {
 			return 0;
 		}
@@ -257,5 +245,10 @@ public class LocalInstanceManagerTest {
 		public InstanceID registerTaskManager(InstanceConnectionInfo instanceConnectionInfo, HardwareDescription hardwareDescription, int numberOfSlots) {
 			return new InstanceID();
 		}
+
+		@Override
+		public InputSplit requestNextInputSplit(JobID jobID, JobVertexID vertex) throws IOException {
+			return null;
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotTestUtils.java
new file mode 100644
index 0000000..ca2ef70
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotTestUtils.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+public class SlotTestUtils {
+
+}


Mime
View raw message