flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [37/63] [abbrv] Finalize ExecutionGraph state machine and calls
Date Sun, 21 Sep 2014 02:13:01 GMT
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetUpdateBarrierBroker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetUpdateBarrierBroker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetUpdateBarrierBroker.java
index 3d891af..e73102a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetUpdateBarrierBroker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetUpdateBarrierBroker.java
@@ -16,12 +16,12 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.iterative.concurrent;
 
 /**
- * Broker to hand over {@link SolutionSetUpdateBarrier} from {@link IterationHeadPactTask} to
- * {@link IterationTailPactTask}.
+ * Broker to hand over {@link SolutionSetUpdateBarrier} from 
+ * {@link org.apache.flink.runtime.iterative.task.IterationHeadPactTask} to
+ * {@link org.apache.flink.runtime.iterative.task.IterationTailPactTask}.
  */
 public class SolutionSetUpdateBarrierBroker extends Broker<SolutionSetUpdateBarrier> {
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/EventCollector.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/EventCollector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/EventCollector.java
index 3bd24a6..a45507a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/EventCollector.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/EventCollector.java
@@ -33,7 +33,7 @@ import org.apache.flink.runtime.event.job.ManagementEvent;
 import org.apache.flink.runtime.event.job.RecentJobEvent;
 import org.apache.flink.runtime.event.job.VertexEvent;
 import org.apache.flink.runtime.execution.ExecutionListener;
-import org.apache.flink.runtime.execution.ExecutionState2;
+import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
@@ -74,7 +74,7 @@ public final class EventCollector extends TimerTask implements ProfilingListener
 
 		@Override
 		public void executionStateChanged(JobID jobID, JobVertexID vertexId, int subtask, ExecutionAttemptID executionId,
-				ExecutionState2 newExecutionState, String optionalMessage)
+				ExecutionState newExecutionState, String optionalMessage)
 		{
 			final long timestamp = System.currentTimeMillis();
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
index 880abd5..d3a920c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
@@ -415,18 +415,18 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
 	}
 	
 	@Override
-	public void updateTaskExecutionState(TaskExecutionState executionState) throws IOException {
+	public boolean updateTaskExecutionState(TaskExecutionState executionState) throws IOException {
 		Preconditions.checkNotNull(executionState);
 
 
 		final ExecutionGraph eg = this.currentJobs.get(executionState.getJobID());
 		if (eg == null) {
-			LOG.error("Cannot find execution graph for ID " + executionState.getJobID() + " to change state to "
-				+ executionState.getExecutionState());
-			return;
+			LOG.debug("Orphaned execution task: UpdateTaskExecutionState call cannot find execution graph for ID " + executionState.getJobID() +
+					" to change state to " + executionState.getExecutionState());
+			return false;
 		}
 
-		eg.updateState(executionState);
+		return eg.updateState(executionState);
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultScheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultScheduler.java
index 06f0eab..54e16b9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultScheduler.java
@@ -27,7 +27,7 @@ import java.util.Set;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.flink.runtime.executiongraph.ExecutionVertex2;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceDiedException;
@@ -39,7 +39,7 @@ import org.apache.flink.runtime.instance.InstanceListener;
  */
 public class DefaultScheduler implements InstanceListener, SlotAvailablilityListener {
 
-	protected static final Logger LOG = LoggerFactory.getLogger(DefaultScheduler.class);
+	private static final Logger LOG = LoggerFactory.getLogger(DefaultScheduler.class);
 	
 	
 	private final Object globalLock = new Object();
@@ -136,6 +136,8 @@ public class DefaultScheduler implements InstanceListener, SlotAvailablilityList
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("Scheduling task " + task);
 		}
+		
+		final ExecutionVertex vertex = task.getTaskToExecute().getVertex();
 	
 		synchronized (globalLock) {
 			// 1) If the task has a strict co-schedule hint, obey it, if it has been assigned.
@@ -154,7 +156,7 @@ public class DefaultScheduler implements InstanceListener, SlotAvailablilityList
 			if (sharingUnit != null) {
 				// see if we can add the task to the current sharing group.
 				SlotSharingGroupAssignment assignment = sharingUnit.getTaskAssignment();
-				AllocatedSlot slot = assignment.getSlotForTask(task.getJobVertexId(), task.getTaskVertex());
+				AllocatedSlot slot = assignment.getSlotForTask(vertex.getJobvertexId(), vertex);
 				if (slot != null) {
 					return slot;
 				}
@@ -165,11 +167,13 @@ public class DefaultScheduler implements InstanceListener, SlotAvailablilityList
 			// we need potentially to loop multiple times, because there may be false positives
 			// in the set-with-available-instances
 			while (true) {
-				Instance instanceToUse = getFreeInstanceForTask(task.getTaskVertex());
+				
+				
+				Instance instanceToUse = getFreeInstanceForTask(task.getTaskToExecute().getVertex());
 			
 				if (instanceToUse != null) {
 					try {
-						AllocatedSlot slot = instanceToUse.allocateSlot(task.getTaskVertex().getJobId());
+						AllocatedSlot slot = instanceToUse.allocateSlot(vertex.getJobId());
 						
 						// if the instance has further available slots, re-add it to the set of available resources.
 						if (instanceToUse.hasResourcesAvailable()) {
@@ -217,7 +221,7 @@ public class DefaultScheduler implements InstanceListener, SlotAvailablilityList
 	 * @param vertex The task to run. 
 	 * @return The instance to run the vertex on, it {@code null}, if no instance is available.
 	 */
-	protected Instance getFreeInstanceForTask(ExecutionVertex2 vertex) {
+	protected Instance getFreeInstanceForTask(ExecutionVertex vertex) {
 		if (this.instancesWithAvailableResources.isEmpty()) {
 			return null;
 		}
@@ -270,9 +274,10 @@ public class DefaultScheduler implements InstanceListener, SlotAvailablilityList
 			
 			if (queued != null) {
 				ScheduledUnit task = queued.getTask();
+				ExecutionVertex vertex = task.getTaskToExecute().getVertex();
 				
 				try {
-					AllocatedSlot newSlot = instance.allocateSlot(task.getTaskVertex().getJobId());
+					AllocatedSlot newSlot = instance.allocateSlot(vertex.getJobId());
 					if (newSlot != null) {
 						
 						// success, remove from the task queue and notify the future
@@ -282,8 +287,8 @@ public class DefaultScheduler implements InstanceListener, SlotAvailablilityList
 								queued.getFuture().setSlot(newSlot);
 							}
 							catch (Throwable t) {
-								LOG.error("Error calling allocation future for task " + task.getTaskVertex().getSimpleName(), t);
-								task.getTaskVertex().fail(t);
+								LOG.error("Error calling allocation future for task " + vertex.getSimpleName(), t);
+								task.getTaskToExecute().fail(t);
 							}
 						}
 					}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduledUnit.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduledUnit.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduledUnit.java
index 2b0de6f..10190f5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduledUnit.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduledUnit.java
@@ -18,48 +18,48 @@
 
 package org.apache.flink.runtime.jobmanager.scheduler;
 
-import org.apache.flink.runtime.executiongraph.ExecutionVertex2;
+import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 public class ScheduledUnit {
 	
-	private final ExecutionVertex2 taskVertex;
+	private final Execution vertexExecution;
 	
 	private final SlotSharingGroup sharingGroup;
 	
 	// --------------------------------------------------------------------------------------------
 	
-	public ScheduledUnit(ExecutionVertex2 taskVertex) {
+	public ScheduledUnit(Execution taskVertex) {
 		if (taskVertex == null) {
 			throw new NullPointerException();
 		}
 		
-		this.taskVertex = taskVertex;
+		this.vertexExecution = taskVertex;
 		this.sharingGroup = null;
 	}
 	
-	public ScheduledUnit(ExecutionVertex2 taskVertex, SlotSharingGroup sharingUnit) {
+	public ScheduledUnit(Execution taskVertex, SlotSharingGroup sharingUnit) {
 		if (taskVertex == null) {
 			throw new NullPointerException();
 		}
 		
-		this.taskVertex = taskVertex;
+		this.vertexExecution = taskVertex;
 		this.sharingGroup = sharingUnit;
 	}
 	
 	ScheduledUnit() {
-		this.taskVertex = null;
+		this.vertexExecution = null;
 		this.sharingGroup = null;
 	}
 
 	// --------------------------------------------------------------------------------------------
 	
 	public JobVertexID getJobVertexId() {
-		return this.taskVertex.getJobvertexId();
+		return this.vertexExecution.getVertex().getJobvertexId();
 	}
 	
-	public ExecutionVertex2 getTaskVertex() {
-		return taskVertex;
+	public Execution getTaskToExecute() {
+		return vertexExecution;
 	}
 	
 	public SlotSharingGroup getSlotSharingGroup() {
@@ -70,6 +70,6 @@ public class ScheduledUnit {
 	
 	@Override
 	public String toString() {
-		return "{vertex=" + taskVertex.getSimpleName() + ", sharingUnit=" + sharingGroup + '}';
+		return "{vertex=" + vertexExecution.getVertexWithAttempt() + ", sharingUnit=" + sharingGroup + '}';
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroupAssignment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroupAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroupAssignment.java
index e7968ed..4599d68 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroupAssignment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroupAssignment.java
@@ -19,17 +19,15 @@
 package org.apache.flink.runtime.jobmanager.scheduler;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 import java.util.Set;
 
-import org.apache.flink.runtime.executiongraph.ExecutionVertex2;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -42,9 +40,6 @@ public class SlotSharingGroupAssignment {
 	/** The slots available per vertex type (jid), keyed by instance, to make them locatable */
 	private final Map<JobVertexID, Map<Instance, List<SharedSlot>>> availableSlotsPerJid = new LinkedHashMap<JobVertexID, Map<Instance, List<SharedSlot>>>();
 	
-	/** The tasks that are waiting, per vertex type (jid) */
-	private final Map<JobVertexID, Queue<ExecutionVertex2>> pendingTasks = new HashMap<JobVertexID, Queue<ExecutionVertex2>>();
-	
 	
 	// --------------------------------------------------------------------------------------------
 	
@@ -84,7 +79,7 @@ public class SlotSharingGroupAssignment {
 		}
 	}
 	
-	public AllocatedSlot getSlotForTask(JobVertexID jid, ExecutionVertex2 vertex) {
+	public AllocatedSlot getSlotForTask(JobVertexID jid, ExecutionVertex vertex) {
 		synchronized (allSlots) {
 			return getSlotForTaskInternal(jid, vertex.getPreferredLocations());
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
index b086fc1..f4c67df 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
@@ -39,10 +39,11 @@ import org.apache.flink.runtime.event.job.AbstractEvent;
 import org.apache.flink.runtime.event.job.ExecutionStateChangeEvent;
 import org.apache.flink.runtime.event.job.JobEvent;
 import org.apache.flink.runtime.event.job.RecentJobEvent;
-import org.apache.flink.runtime.execution.ExecutionState2;
+import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex2;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -248,20 +249,24 @@ public class JobmanagerInfoServlet extends HttpServlet {
 			if (jobEvent.getJobStatus() == JobStatus.FAILED) {
 				wrt.write("\"failednodes\": [");
 				boolean first = true;
-				for (ExecutionVertex2 vertex : graph.getAllExecutionVertices()) {
-					if (vertex.getExecutionState() == ExecutionState2.FAILED) {
-					if (first) {
-						first = false;
-					} else {
-						wrt.write(",");
+				for (ExecutionVertex vertex : graph.getAllExecutionVertices()) {
+					if (vertex.getExecutionState() == ExecutionState.FAILED) {
+						AllocatedSlot slot = vertex.getCurrentAssignedResource();
+						Throwable failureCause = vertex.getFailureCause();
+						if (slot != null || failureCause != null) {
+							if (first) {
+								first = false;
+							} else {
+								wrt.write(",");
+							}
+							wrt.write("{");
+							wrt.write("\"node\": \"" + (slot == null ? "(none)" : slot.getInstance().getInstanceConnectionInfo().hostname()) + "\",");
+							wrt.write("\"message\": \"" + (failureCause == null ? "" : StringUtils.escapeHtml(ExceptionUtils.stringifyException(failureCause))) + "\"");
+							wrt.write("}");
+						}
 					}
-					wrt.write("{");
-					wrt.write("\"node\": \"" + vertex.getAssignedResource().getInstance().getInstanceConnectionInfo().hostname() + "\",");
-					wrt.write("\"message\": \"" + StringUtils.escapeHtml(ExceptionUtils.stringifyException(vertex.getFailureCause())) + "\"");
-					wrt.write("}");
 				}
-			}
-			wrt.write("],");
+				wrt.write("],");
 			}
 
 			// Serialize ManagementGraph to json
@@ -307,16 +312,16 @@ public class JobmanagerInfoServlet extends HttpServlet {
 				long ended = 0;
 				
 				// Take earliest running state and latest endstate of groupmembers
-				for (ExecutionVertex2 vertex : groupVertex.getTaskVertices()) {
+				for (ExecutionVertex vertex : groupVertex.getTaskVertices()) {
 					
-					long running = vertex.getStateTimestamp(ExecutionState2.RUNNING);
+					long running = vertex.getStateTimestamp(ExecutionState.RUNNING);
 					if (running != 0 && running < started) {
 						started = running;
 					}
 					
-					long finished = vertex.getStateTimestamp(ExecutionState2.FINISHED);
-					long canceled = vertex.getStateTimestamp(ExecutionState2.CANCELED);
-					long failed = vertex.getStateTimestamp(ExecutionState2.FAILED);
+					long finished = vertex.getStateTimestamp(ExecutionState.FINISHED);
+					long canceled = vertex.getStateTimestamp(ExecutionState.CANCELED);
+					long failed = vertex.getStateTimestamp(ExecutionState.FAILED);
 					
 					if(finished != 0 && finished > ended) {
 						ended = finished;
@@ -460,7 +465,7 @@ public class JobmanagerInfoServlet extends HttpServlet {
 			for (ExecutionJobVertex groupVertex : graph.getAllVertices().values()) {
 				
 				int num = 0;
-				for (ExecutionVertex2 vertex : groupVertex.getTaskVertices()) {
+				for (ExecutionVertex vertex : groupVertex.getTaskVertices()) {
 					
 					if(first) {
 						first = false;
@@ -470,14 +475,14 @@ public class JobmanagerInfoServlet extends HttpServlet {
 					wrt.write("\""+jobVertex.getJobVertex()+"-"+num +"\": {");
 					wrt.write("\"vertexid\": \"" + vertex.getJobvertexId() + "\",");
 					wrt.write("\"vertexname\": \"" + vertex + "\",");
-					wrt.write("\"CREATED\": "+ vertex.getStateTimestamp(ExecutionState2.CREATED) + ",");
-					wrt.write("\"SCHEDULED\": "+ vertex.getStateTimestamp(ExecutionState2.SCHEDULED) + ",");
-					wrt.write("\"STARTING\": "+ vertex.getStateTimestamp(ExecutionState2.DEPLOYING) + ",");
-					wrt.write("\"RUNNING\": "+ vertex.getStateTimestamp(ExecutionState2.RUNNING) + ",");
-					wrt.write("\"FINISHED\": "+ vertex.getStateTimestamp(ExecutionState2.FINISHED) + ",");
-					wrt.write("\"CANCELING\": "+ vertex.getStateTimestamp(ExecutionState2.CANCELING) + ",");
-					wrt.write("\"CANCELED\": "+ vertex.getStateTimestamp(ExecutionState2.CANCELED) + ",");
-					wrt.write("\"FAILED\": "+ vertex.getStateTimestamp(ExecutionState2.FAILED) + "");
+					wrt.write("\"CREATED\": "+ vertex.getStateTimestamp(ExecutionState.CREATED) + ",");
+					wrt.write("\"SCHEDULED\": "+ vertex.getStateTimestamp(ExecutionState.SCHEDULED) + ",");
+					wrt.write("\"STARTING\": "+ vertex.getStateTimestamp(ExecutionState.DEPLOYING) + ",");
+					wrt.write("\"RUNNING\": "+ vertex.getStateTimestamp(ExecutionState.RUNNING) + ",");
+					wrt.write("\"FINISHED\": "+ vertex.getStateTimestamp(ExecutionState.FINISHED) + ",");
+					wrt.write("\"CANCELING\": "+ vertex.getStateTimestamp(ExecutionState.CANCELING) + ",");
+					wrt.write("\"CANCELED\": "+ vertex.getStateTimestamp(ExecutionState.CANCELED) + ",");
+					wrt.write("\"FAILED\": "+ vertex.getStateTimestamp(ExecutionState.FAILED) + "");
 					wrt.write("}");
 					
 					num++;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java
index 9084010..745a9f8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java
@@ -22,9 +22,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.flink.runtime.execution.ExecutionState2;
+import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex2;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.executiongraph.IntermediateResult;
 import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.io.network.channels.ChannelType;
@@ -32,14 +32,14 @@ import org.apache.flink.util.StringUtils;
 
 public class JsonFactory {
 
-	public static String toJson(ExecutionVertex2 vertex) {
+	public static String toJson(ExecutionVertex vertex) {
 		StringBuilder json = new StringBuilder("");
 		json.append("{");
 		json.append("\"vertexid\": \"" + vertex.getJobvertexId() + "\",");
 		json.append("\"vertexname\": \"" + StringUtils.escapeHtml(vertex.getSimpleName()) + "\",");
 		json.append("\"vertexstatus\": \"" + vertex.getExecutionState() + "\",");
 		
-		AllocatedSlot slot = vertex.getAssignedResource();
+		AllocatedSlot slot = vertex.getCurrentAssignedResource();
 		String instanceName = slot == null ? "(null)" : slot.getInstance().getInstanceConnectionInfo().hostname();
 		
 		json.append("\"vertexinstancename\": \"" + instanceName + "\"");
@@ -57,17 +57,17 @@ public class JsonFactory {
 		json.append("\"groupmembers\": [");
 		
 		// Count state status of group members
-		Map<ExecutionState2, Integer> stateCounts = new HashMap<ExecutionState2, Integer>();
+		Map<ExecutionState, Integer> stateCounts = new HashMap<ExecutionState, Integer>();
 		
 		// initialize with 0
-		for (ExecutionState2 state : ExecutionState2.values()) {
+		for (ExecutionState state : ExecutionState.values()) {
 			stateCounts.put(state, new Integer(0));
 		}
 		
-		ExecutionVertex2[] vertices = jobVertex.getTaskVertices();
+		ExecutionVertex[] vertices = jobVertex.getTaskVertices();
 		
 		for(int j = 0; j < vertices.length; j++) {
-			ExecutionVertex2 vertex = vertices[j];
+			ExecutionVertex vertex = vertices[j];
 			
 			json.append(toJson(vertex));
 			
@@ -103,7 +103,7 @@ public class JsonFactory {
 		json.append("]");
 		
 		// list number of members for each status
-		for (Map.Entry<ExecutionState2, Integer> stateCount : stateCounts.entrySet()) {
+		for (Map.Entry<ExecutionState, Integer> stateCount : stateCounts.entrySet()) {
 			json.append(",\""+stateCount.getKey()+"\": " + stateCount.getValue());
 		}
 		

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/AbstractMutableHashTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/AbstractMutableHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/AbstractMutableHashTable.java
index 9c443ad..2cc1377 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/AbstractMutableHashTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/AbstractMutableHashTable.java
@@ -74,14 +74,7 @@ public abstract class AbstractMutableHashTable<T> {
 	// ------------- Accessors -------------
 	
 	public abstract MutableObjectIterator<T> getEntryIterator();
-	
-	/**
-	 * 
-	 * @param probeSideComparator
-	 * @param pairComparator
-	 * @param <PT> The type of the probe side.
-	 * @return
-	 */
+
 	public abstract <PT> AbstractHashTableProber<PT, T> getProber(TypeComparator<PT> probeSideComparator, TypePairComparator<PT, T> pairComparator);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
index 2eaff60..ec41408 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators.sort;
 
 import java.io.File;
@@ -754,11 +753,6 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 		private final ExceptionHandler<IOException> exceptionHandler;
 
 		/**
-		 * The parent task at whom the thread needs to register.
-		 */
-		private final AbstractInvokable parentTask;
-
-		/**
 		 * The flag marking this thread as alive.
 		 */
 		private volatile boolean alive;
@@ -783,7 +777,6 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 			this.setUncaughtExceptionHandler(this);
 
 			this.queues = queues;
-			this.parentTask = parentTask;
 			this.alive = true;
 		}
 
@@ -1170,7 +1163,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 		 * @param exceptionHandler The exception handler to call for all exceptions.
 		 * @param queues The queues used to pass buffers between the threads.
 		 * @param parentTask The task that started this thread. If non-null, it is used to register this thread.
-		 * @param memoryManager The memory manager used to allocate buffers for the readers and writers.
+		 * @param memManager The memory manager used to allocate buffers for the readers and writers.
 		 * @param ioManager The I/I manager used to instantiate readers and writers from.
 		 * @param serializer
 		 * @param comparator
@@ -1449,12 +1442,9 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 		 * 
 		 * @param channelIDs The IDs of the sorted runs that need to be merged.
 		 * @param writeBuffers The buffers to be used by the writers.
-		 * @param writeBufferSize The size of the write buffers.
-		 * @param  readMemorySize The amount of memory dedicated to the readers.
+
 		 * @return A list of the IDs of the merged channels.
 		 * @throws IOException Thrown, if the readers or writers encountered an I/O problem.
-		 * @throws MemoryAllocationException Thrown, if the specified memory is insufficient to merge the channels
-		 *                                   or if the memory manager could not provide the requested memory.
 		 */
 		protected final List<ChannelWithBlockCount> mergeChannelList(final List<ChannelWithBlockCount> channelIDs,
 					final List<MemorySegment> allReadBuffers, final List<MemorySegment> writeBuffers)
@@ -1548,7 +1538,6 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 		 * @param memory A list containing the memory buffers to be distributed. The buffers are not
 		 *               removed from this list.
 		 * @param numChannels The number of channels for which to allocate buffers. Must not be zero.
-		 * @return A list with all memory segments that were allocated.
 		 */
 		protected final void getSegmentsForReaders(List<List<MemorySegment>> target,
 			List<MemorySegment> memory, int numChannels)
@@ -1586,7 +1575,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 		/**
 		 * Adds a channel to the list of channels that are to be removed at shutdown.
 		 * 
-		 * @param s The channel id.
+		 * @param channel The channel id.
 		 */
 		protected void registerChannelToBeRemovedAtShudown(Channel.ID channel) {
 			UnilateralSortMerger.this.channelsToDeleteAtShutdown.add(channel);
@@ -1595,7 +1584,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 		/**
 		 * Removes a channel from the list of channels that are to be removed at shutdown.
 		 * 
-		 * @param s The channel id.
+		 * @param channel The channel id.
 		 */
 		protected void unregisterChannelToBeRemovedAtShudown(Channel.ID channel) {
 			UnilateralSortMerger.this.channelsToDeleteAtShutdown.remove(channel);
@@ -1604,7 +1593,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 		/**
 		 * Adds a channel reader/writer to the list of channels that are to be removed at shutdown.
 		 * 
-		 * @param s The channel reader/writer.
+		 * @param channel The channel reader/writer.
 		 */
 		protected void registerOpenChannelToBeRemovedAtShudown(BlockChannelAccess<?, ?> channel) {
 			UnilateralSortMerger.this.openChannels.add(channel);
@@ -1613,7 +1602,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 		/**
 		 * Removes a channel reader/writer from the list of channels that are to be removed at shutdown.
 		 * 
-		 * @param s The channel reader/writer.
+		 * @param channel The channel reader/writer.
 		 */
 		protected void unregisterOpenChannelToBeRemovedAtShudown(BlockChannelAccess<?, ?> channel) {
 			UnilateralSortMerger.this.openChannels.remove(channel);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/EnvironmentListenerImpl.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/EnvironmentListenerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/EnvironmentListenerImpl.java
index 4a14e13..e11fc78 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/EnvironmentListenerImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/EnvironmentListenerImpl.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.profiling.impl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.runtime.execution.ExecutionListener;
-import org.apache.flink.runtime.execution.ExecutionState2;
+import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.RuntimeEnvironment;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobID;
@@ -43,7 +43,7 @@ public class EnvironmentListenerImpl implements ExecutionListener {
 
 
 	@Override
-	public void executionStateChanged(JobID jobID, JobVertexID vertexId, int subtaskIndex, ExecutionAttemptID executionId, ExecutionState2 newExecutionState, String optionalMessage) {
+	public void executionStateChanged(JobID jobID, JobVertexID vertexId, int subtaskIndex, ExecutionAttemptID executionId, ExecutionState newExecutionState, String optionalMessage) {
 
 		switch (newExecutionState) {
 		case RUNNING:

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/JobProfilingData.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/JobProfilingData.java b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/JobProfilingData.java
index 16c2bf5..8e85f2f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/JobProfilingData.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/JobProfilingData.java
@@ -25,7 +25,7 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex2;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
@@ -58,8 +58,8 @@ public class JobProfilingData {
 
 	public boolean addIfInstanceIsAllocatedByJob(InternalInstanceProfilingData instanceProfilingData) {
 
-		for (ExecutionVertex2 executionVertex : this.executionGraph.getAllExecutionVertices()) {
-			AllocatedSlot slot = executionVertex.getAssignedResource();
+		for (ExecutionVertex executionVertex : this.executionGraph.getAllExecutionVertices()) {
+			AllocatedSlot slot = executionVertex.getCurrentAssignedResource();
 			if (slot != null && slot.getInstance().getInstanceConnectionInfo().equals(
 					instanceProfilingData.getInstanceConnectionInfo()))
 			{
@@ -75,8 +75,8 @@ public class JobProfilingData {
 
 		final Set<Instance> tempSet = new HashSet<Instance>();
 		
-		for (ExecutionVertex2 executionVertex : this.executionGraph.getAllExecutionVertices()) {
-			AllocatedSlot slot = executionVertex.getAssignedResource();
+		for (ExecutionVertex executionVertex : this.executionGraph.getAllExecutionVertices()) {
+			AllocatedSlot slot = executionVertex.getCurrentAssignedResource();
 			if (slot != null) {
 				tempSet.add(slot.getInstance());
 			}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/JobManagerProtocol.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/JobManagerProtocol.java b/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/JobManagerProtocol.java
index 9c46ac5..26fd095 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/JobManagerProtocol.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/JobManagerProtocol.java
@@ -53,12 +53,13 @@ public interface JobManagerProtocol extends VersionedProtocol {
 	InstanceID registerTaskManager(InstanceConnectionInfo instanceConnectionInfo, HardwareDescription hardwareDescription, int numberOfSlots) throws IOException;
 
 	/**
-	 * Reports an update of a task's execution state to the job manager.
+	 * Reports an update of a task's execution state to the job manager. This method returns true, if the state was
+	 * correctly registered. It it returns false, the calling task manager should cancel its execution of the task.
 	 * 
-	 * @param taskExecutionState
-	 *        the new task execution state
-	 * @throws IOException
-	 *         thrown if an error occurs during this remote procedure call
+	 * @param taskExecutionState The new task execution state.
+	 * @return True if everything is all right, false if the caller should cancel the task execution.
+	 * 
+	 * @throws IOException Thrown, if an error occurs during this remote procedure call
 	 */
-	void updateTaskExecutionState(TaskExecutionState taskExecutionState) throws IOException;
+	boolean updateTaskExecutionState(TaskExecutionState taskExecutionState) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/TaskOperationProtocol.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/TaskOperationProtocol.java b/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/TaskOperationProtocol.java
index 1800e3f..a71f760 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/TaskOperationProtocol.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/TaskOperationProtocol.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.execution.librarycache.LibraryCacheProfileReques
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheProfileResponse;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheUpdate;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.taskmanager.TaskOperationResult;
 
 /**
@@ -38,7 +37,7 @@ public interface TaskOperationProtocol extends VersionedProtocol {
 
 	TaskOperationResult submitTask(TaskDeploymentDescriptor task) throws IOException;
 
-	TaskOperationResult cancelTask(JobVertexID vertexId, int subtaskIndex, ExecutionAttemptID executionId) throws IOException;
+	TaskOperationResult cancelTask(ExecutionAttemptID executionId) throws IOException;
 
 	LibraryCacheProfileResponse getLibraryCacheProfile(LibraryCacheProfileRequest request) throws IOException;
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 7e48d7c..7b692e3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -27,7 +27,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.execution.ExecutionListener;
-import org.apache.flink.runtime.execution.ExecutionState2;
+import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.RuntimeEnvironment;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobID;
@@ -39,8 +39,8 @@ import org.apache.flink.util.ExceptionUtils;
 public final class Task {
 
 	/** For atomic state updates */
-	private static final AtomicReferenceFieldUpdater<Task, ExecutionState2> STATE_UPDATER = 
-			AtomicReferenceFieldUpdater.newUpdater(Task.class, ExecutionState2.class, "executionState");
+	private static final AtomicReferenceFieldUpdater<Task, ExecutionState> STATE_UPDATER = 
+			AtomicReferenceFieldUpdater.newUpdater(Task.class, ExecutionState.class, "executionState");
 			
 	/** The log object used for debugging. */
 	private static final Logger LOG = LoggerFactory.getLogger(Task.class);
@@ -68,7 +68,7 @@ public final class Task {
 	private volatile RuntimeEnvironment environment;
 	
 	/** The current execution state of the task */
-	private volatile ExecutionState2 executionState = ExecutionState2.DEPLOYING;
+	private volatile ExecutionState executionState = ExecutionState.DEPLOYING;
 
 	// --------------------------------------------------------------------------------------------	
 	
@@ -135,7 +135,7 @@ public final class Task {
 	 * 
 	 * @return the current execution state of the task
 	 */
-	public ExecutionState2 getExecutionState() {
+	public ExecutionState getExecutionState() {
 		return this.executionState;
 	}
 	
@@ -148,8 +148,8 @@ public final class Task {
 	}
 	
 	public boolean isCanceled() {
-		return executionState == ExecutionState2.CANCELING ||
-				executionState == ExecutionState2.CANCELED;
+		return executionState == ExecutionState.CANCELING ||
+				executionState == ExecutionState.CANCELED;
 	}
 	
 	public String getTaskName() {
@@ -172,9 +172,9 @@ public final class Task {
 	 * @return True, if the task correctly enters the state FINISHED.
 	 */
 	public boolean markAsFinished() {
-		if (STATE_UPDATER.compareAndSet(this, ExecutionState2.RUNNING, ExecutionState2.FINISHED)) {
-			notifyObservers(ExecutionState2.FINISHED, null);
-			taskManager.notifyExecutionStateChange(jobId, executionId, ExecutionState2.FINISHED, null);
+		if (STATE_UPDATER.compareAndSet(this, ExecutionState.RUNNING, ExecutionState.FINISHED)) {
+			notifyObservers(ExecutionState.FINISHED, null);
+			taskManager.notifyExecutionStateChange(jobId, executionId, ExecutionState.FINISHED, null);
 			return true;
 		} else {
 			return false;
@@ -183,16 +183,15 @@ public final class Task {
 	
 	public void markFailed(Throwable error) {
 		while (true) {
-			ExecutionState2 current = this.executionState;
+			ExecutionState current = this.executionState;
 			
-			if (current == ExecutionState2.CANCELED || current == ExecutionState2.CANCELING) {
+			if (current == ExecutionState.CANCELED || current == ExecutionState.CANCELING) {
 				return;
 			}
 			
-			if (STATE_UPDATER.compareAndSet(this, current, ExecutionState2.FAILED)) {
-				String message = ExceptionUtils.stringifyException(error);
-				notifyObservers(ExecutionState2.FAILED, message);
-				taskManager.notifyExecutionStateChange(jobId, executionId, ExecutionState2.FAILED, message);
+			if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) {
+				notifyObservers(ExecutionState.FAILED, ExceptionUtils.stringifyException(error));
+				taskManager.notifyExecutionStateChange(jobId, executionId, ExecutionState.FAILED, error);
 				return;
 			}
 		}
@@ -200,27 +199,27 @@ public final class Task {
 	
 	public void cancelExecution() {
 		while (true) {
-			ExecutionState2 current = this.executionState;
+			ExecutionState current = this.executionState;
 			
 			// if the task is already canceled (or canceling) or finished, then we
 			// need not do anything
-			if (current == ExecutionState2.FINISHED || current == ExecutionState2.CANCELED ||
-					current == ExecutionState2.CANCELING) {
+			if (current == ExecutionState.FINISHED || current == ExecutionState.CANCELED ||
+					current == ExecutionState.CANCELING) {
 				return;
 			}
 			
-			if (current == ExecutionState2.DEPLOYING) {
+			if (current == ExecutionState.DEPLOYING) {
 				// directly set to canceled
-				if (STATE_UPDATER.compareAndSet(this, current, ExecutionState2.CANCELED)) {
-					notifyObservers(ExecutionState2.CANCELED, null);
-					taskManager.notifyExecutionStateChange(jobId, executionId, ExecutionState2.CANCELED, null);
+				if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.CANCELED)) {
+					notifyObservers(ExecutionState.CANCELED, null);
+					taskManager.notifyExecutionStateChange(jobId, executionId, ExecutionState.CANCELED, null);
 					return;
 				}
 			}
-			else if (current == ExecutionState2.RUNNING) {
+			else if (current == ExecutionState.RUNNING) {
 				// go to canceling and perform the actual task canceling
-				if (STATE_UPDATER.compareAndSet(this, current, ExecutionState2.CANCELING)) {
-					notifyObservers(ExecutionState2.CANCELING, null);
+				if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.CANCELING)) {
+					notifyObservers(ExecutionState.CANCELING, null);
 					try {
 						this.environment.cancelExecution();
 					} catch (Throwable e) {
@@ -237,9 +236,9 @@ public final class Task {
 	}
 	
 	public void cancelingDone() {
-		if (STATE_UPDATER.compareAndSet(this, ExecutionState2.CANCELING, ExecutionState2.CANCELED)) {
-			notifyObservers(ExecutionState2.CANCELED, null);
-			taskManager.notifyExecutionStateChange(jobId, executionId, ExecutionState2.CANCELED, null);
+		if (STATE_UPDATER.compareAndSet(this, ExecutionState.CANCELING, ExecutionState.CANCELED)) {
+			notifyObservers(ExecutionState.CANCELED, null);
+			taskManager.notifyExecutionStateChange(jobId, executionId, ExecutionState.CANCELED, null);
 		}
 	}
 
@@ -247,7 +246,7 @@ public final class Task {
 	 * Starts the execution of this task.
 	 */
 	public boolean startExecution() {
-		if (STATE_UPDATER.compareAndSet(this, ExecutionState2.DEPLOYING, ExecutionState2.RUNNING)) {
+		if (STATE_UPDATER.compareAndSet(this, ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
 			final Thread thread = this.environment.getExecutingThread();
 			thread.start();
 			return true;
@@ -314,7 +313,7 @@ public final class Task {
 		this.executionListeners.remove(listener);
 	}
 	
-	private void notifyObservers(ExecutionState2 newState, String message) {
+	private void notifyObservers(ExecutionState newState, String message) {
 		for (ExecutionListener listener : this.executionListeners) {
 			try {
 				listener.executionStateChanged(jobId, vertexId, subtaskIndex, executionId, newState, message);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
index 8ab67bc..6e0a61d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
@@ -18,15 +18,18 @@
 
 package org.apache.flink.runtime.taskmanager;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.execution.ExecutionState2;
+import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobID;
-import org.apache.flink.types.StringValue;
 
 /**
  * This class represents an update about a task's execution state.
@@ -39,10 +42,15 @@ public class TaskExecutionState implements IOReadableWritable , java.io.Serializ
 
 	private ExecutionAttemptID executionId;
 
-	private ExecutionState2 executionState;
+	private ExecutionState executionState;
 
-	private String description;
+	private Throwable error;
 
+	
+	public TaskExecutionState(JobID jobID, ExecutionAttemptID executionId, ExecutionState executionState) {
+		this(jobID, executionId, executionState, null);
+	}
+	
 	/**
 	 * Creates a new task execution state.
 	 * 
@@ -52,10 +60,10 @@ public class TaskExecutionState implements IOReadableWritable , java.io.Serializ
 	 *        the ID of the task execution whose state is to be reported
 	 * @param executionState
 	 *        the execution state to be reported
-	 * @param description
-	 *        an optional description
+	 * @param error
+	 *        an optional error
 	 */
-	public TaskExecutionState(JobID jobID, ExecutionAttemptID executionId, ExecutionState2 executionState, String description) {
+	public TaskExecutionState(JobID jobID, ExecutionAttemptID executionId, ExecutionState executionState, Throwable error) {
 		if (jobID == null || executionId == null || executionState == null) {
 			throw new NullPointerException();
 		}
@@ -63,7 +71,7 @@ public class TaskExecutionState implements IOReadableWritable , java.io.Serializ
 		this.jobID = jobID;
 		this.executionId = executionId;
 		this.executionState = executionState;
-		this.description = description;
+		this.error = error;
 	}
 
 	/**
@@ -76,13 +84,8 @@ public class TaskExecutionState implements IOReadableWritable , java.io.Serializ
 
 	// --------------------------------------------------------------------------------------------
 	
-	/**
-	 * Returns the description of this task execution state.
-	 * 
-	 * @return the description of this task execution state or <code>null</code> if there is no description available
-	 */
-	public String getDescription() {
-		return this.description;
+	public Throwable getError() {
+		return this.error;
 	}
 
 	/**
@@ -99,7 +102,7 @@ public class TaskExecutionState implements IOReadableWritable , java.io.Serializ
 	 * 
 	 * @return the new execution state of the task
 	 */
-	public ExecutionState2 getExecutionState() {
+	public ExecutionState getExecutionState() {
 		return this.executionState;
 	}
 
@@ -118,12 +121,24 @@ public class TaskExecutionState implements IOReadableWritable , java.io.Serializ
 	public void read(DataInputView in) throws IOException {
 		this.jobID.read(in);
 		this.executionId.read(in);
-		this.executionState = ExecutionState2.values()[in.readInt()];
-
-		if (in.readBoolean()) {
-			this.description = StringValue.readString(in);
-		} else {
-			this.description = null;
+		this.executionState = ExecutionState.values()[in.readInt()];
+
+		// read the exception
+		int errorDataLen = in.readInt();
+		if (errorDataLen > 0) {
+			byte[] data = new byte[errorDataLen];
+			in.readFully(data);
+			try {
+				ByteArrayInputStream bis = new ByteArrayInputStream(data);
+				ObjectInputStream ois = new ObjectInputStream(bis);
+				this.error = (Throwable) ois.readObject();
+				ois.close();
+			} catch (Throwable t) {
+				this.error = new Exception("An error occurred, but the exception could not be transfered through the RPC");
+			}
+		}
+		else {
+			this.error = null;
 		}
 	}
 
@@ -133,11 +148,20 @@ public class TaskExecutionState implements IOReadableWritable , java.io.Serializ
 		this.executionId.write(out);
 		out.writeInt(this.executionState.ordinal());
 
-		if (description != null) {
-			out.writeBoolean(true);
-			StringValue.writeString(description, out);
-		} else {
-			out.writeBoolean(false);
+		// transfer the exception
+		if (this.error != null) {
+			ByteArrayOutputStream baos = new ByteArrayOutputStream();
+			ObjectOutputStream oos = new ObjectOutputStream(baos);
+			oos.writeObject(error);
+			oos.flush();
+			oos.close();
+			
+			byte[] data = baos.toByteArray();
+			out.writeInt(data.length);
+			out.write(data);
+		}
+		else {
+			out.writeInt(0);
 		}
 	}
 	
@@ -150,8 +174,10 @@ public class TaskExecutionState implements IOReadableWritable , java.io.Serializ
 			return other.jobID.equals(this.jobID) &&
 					other.executionId.equals(this.executionId) &&
 					other.executionState == this.executionState &&
-					(other.description == null ? this.description == null :
-						(this.description != null && other.description.equals(this.description)));
+					(other.error == null ? this.error == null :
+						(this.error != null && other.error.getClass() == this.error.getClass()));
+			
+			// NOTE: exception equality does not work, so we can only check for same error class
 		}
 		else {
 			return false;
@@ -165,7 +191,7 @@ public class TaskExecutionState implements IOReadableWritable , java.io.Serializ
 	
 	@Override
 	public String toString() {
-		return String.format("TaskState jobId=%s, executionId=%s, state=%s, description=%s", 
-				jobID, executionId, executionState, description == null ? "(null)" : description);
+		return String.format("TaskState jobId=%s, executionId=%s, state=%s, error=%s", 
+				jobID, executionId, executionState, error == null ? "(null)" : error.getClass().getName() + ": " + error.getMessage());
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
index f2259c4..fd9f10d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
@@ -61,7 +61,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.protocols.VersionedProtocol;
 import org.apache.flink.runtime.ExecutionMode;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
-import org.apache.flink.runtime.execution.ExecutionState2;
+import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.RuntimeEnvironment;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheProfileRequest;
@@ -501,17 +501,21 @@ public class TaskManager implements TaskOperationProtocol {
 		return Collections.unmodifiableMap(this.runningTasks);
 	}
 	
+	public ChannelManager getChannelManager() {
+		return channelManager;
+	}
+	
 	// --------------------------------------------------------------------------------------------
 	//  Task Operation
 	// --------------------------------------------------------------------------------------------
 
 	@Override
-	public TaskOperationResult cancelTask(JobVertexID vertexId, int subtaskIndex, ExecutionAttemptID executionId) throws IOException {
+	public TaskOperationResult cancelTask(ExecutionAttemptID executionId) throws IOException {
 
 		final Task task = this.runningTasks.get(executionId);
 
 		if (task == null) {
-			return new TaskOperationResult(vertexId, subtaskIndex, executionId, false, "No task with that execution ID was found.");
+			return new TaskOperationResult(executionId, false, "No task with that execution ID was found.");
 		}
 
 		// Pass call to executor service so IPC thread can return immediately
@@ -524,7 +528,7 @@ public class TaskManager implements TaskOperationProtocol {
 		this.executorService.execute(r);
 
 		// return success
-		return new TaskOperationResult(vertexId, subtaskIndex, executionId, true);
+		return new TaskOperationResult(executionId, true);
 	}
 
 
@@ -580,7 +584,7 @@ public class TaskManager implements TaskOperationProtocol {
 				}
 			
 				success = true;
-				return new TaskOperationResult(vertexId, taskIndex, executionId, true);
+				return new TaskOperationResult(executionId, true);
 			}
 			finally {
 				if (!success) {
@@ -604,7 +608,7 @@ public class TaskManager implements TaskOperationProtocol {
 				}
 			}
 			
-			return new TaskOperationResult(vertexId, taskIndex, executionId, false, ExceptionUtils.stringifyException(t));
+			return new TaskOperationResult(executionId, false, ExceptionUtils.stringifyException(t));
 		}
 	}
 
@@ -647,12 +651,12 @@ public class TaskManager implements TaskOperationProtocol {
 		}
 	}
 
-	public void notifyExecutionStateChange(JobID jobID, ExecutionAttemptID executionId, ExecutionState2 newExecutionState, String optionalDescription) {
+	public void notifyExecutionStateChange(JobID jobID, ExecutionAttemptID executionId, ExecutionState newExecutionState, Throwable optionalError) {
 		
 		// Get lock on the jobManager object and propagate the state change
 		boolean success = false;
 		try {
-			this.jobManager.updateTaskExecutionState(new TaskExecutionState(jobID, executionId, newExecutionState, optionalDescription));
+			success = this.jobManager.updateTaskExecutionState(new TaskExecutionState(jobID, executionId, newExecutionState, optionalError));
 		}
 		catch (Throwable t) {
 			String msg = "Error sending task state update to JobManager.";
@@ -662,10 +666,9 @@ public class TaskManager implements TaskOperationProtocol {
 		finally {
 			// in case of a failure, or when the tasks is in a finished state, then unregister the
 			// task (free all buffers, remove all channels, task-specific class loaders, etc...)
-			if (!success || newExecutionState == ExecutionState2.FINISHED || newExecutionState == ExecutionState2.CANCELED
-					|| newExecutionState == ExecutionState2.FAILED)
+			if (!success || newExecutionState == ExecutionState.FINISHED || newExecutionState == ExecutionState.CANCELED
+					|| newExecutionState == ExecutionState.FAILED)
 			{
-				
 				unregisterTask(executionId);
 			}
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskOperationResult.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskOperationResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskOperationResult.java
index f0f00a7..f1846a8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskOperationResult.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskOperationResult.java
@@ -24,17 +24,15 @@ import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.types.StringValue;
+import org.apache.flink.util.StringUtils;
 
 import com.google.common.base.Preconditions;
 
 
-public class TaskOperationResult implements IOReadableWritable {
-
-	private JobVertexID vertexId;
+public class TaskOperationResult implements IOReadableWritable, java.io.Serializable {
 	
-	private int subtaskIndex;
+	private static final long serialVersionUID = -3852292420229699888L;
+
 	
 	private ExecutionAttemptID executionId;
 	
@@ -44,32 +42,21 @@ public class TaskOperationResult implements IOReadableWritable {
 
 
 	public TaskOperationResult() {
-		this(new JobVertexID(), -1, new ExecutionAttemptID(), false);
+		this(new ExecutionAttemptID(), false);
 	}
 	
-	public TaskOperationResult(JobVertexID vertexId, int subtaskIndex, ExecutionAttemptID executionId, boolean success) {
-		this(vertexId, subtaskIndex, executionId, success, null);
+	public TaskOperationResult(ExecutionAttemptID executionId, boolean success) {
+		this(executionId, success, null);
 	}
 	
-	public TaskOperationResult(JobVertexID vertexId, int subtaskIndex, ExecutionAttemptID executionId, boolean success, String description) {
-		Preconditions.checkNotNull(vertexId);
+	public TaskOperationResult(ExecutionAttemptID executionId, boolean success, String description) {
 		Preconditions.checkNotNull(executionId);
 		
-		this.vertexId = vertexId;
-		this.subtaskIndex = subtaskIndex;
 		this.executionId = executionId;
 		this.success = success;
 		this.description = description;
 	}
-
-
-	public JobVertexID getVertexId() {
-		return vertexId;
-	}
 	
-	public int getSubtaskIndex() {
-		return subtaskIndex;
-	}
 	
 	public ExecutionAttemptID getExecutionId() {
 		return executionId;
@@ -82,29 +69,32 @@ public class TaskOperationResult implements IOReadableWritable {
 	public String getDescription() {
 		return description;
 	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  Serialization
+	// --------------------------------------------------------------------------------------------
 
 	@Override
 	public void read(DataInputView in) throws IOException {
-		this.vertexId.read(in);
-		this.subtaskIndex = in.readInt();
+		this.executionId.read(in);
 		this.success = in.readBoolean();
-		
-		if (in.readBoolean()) {
-			this.description = StringValue.readString(in);
-		}
+		this.description = StringUtils.readNullableString(in);
 	}
 
 	@Override
 	public void write(DataOutputView out) throws IOException {
-		this.vertexId.write(out);
-		out.writeInt(subtaskIndex);
+		this.executionId.write(out);
 		out.writeBoolean(success);
-		
-		if (description != null) {
-			out.writeBoolean(true);
-			StringValue.writeString(description, out);
-		} else {
-			out.writeBoolean(false);
-		}
+		StringUtils.writeNullableString(description, out);
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  Utilities
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public String toString() {
+		return String.format("TaskOperationResult %s [%s]%s", executionId, 
+				success ? "SUCCESS" : "FAILED", description == null ? "" : " - " + description);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobResultTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobResultTest.java
index 9ae6e2b..8ba0576 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobResultTest.java
@@ -30,7 +30,6 @@ import org.apache.flink.runtime.event.job.JobEvent;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.runtime.util.SerializableArrayList;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/test/java/org/apache/flink/runtime/event/job/EventsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/event/job/EventsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/event/job/EventsTest.java
index ff6732b..2d7ef37 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/event/job/EventsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/event/job/EventsTest.java
@@ -23,7 +23,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.fail;
 
-import org.apache.flink.runtime.execution.ExecutionState2;
+import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -43,8 +43,8 @@ public class EventsTest {
 				JobVertexID jid = new JobVertexID();
 				ExecutionAttemptID eid = new ExecutionAttemptID();
 				
-				ExecutionStateChangeEvent e1 = new ExecutionStateChangeEvent(429345231796596L, jid, 17, eid, ExecutionState2.CANCELING);
-				ExecutionStateChangeEvent e2 = new ExecutionStateChangeEvent(429345231796596L, jid, 17, eid, ExecutionState2.CANCELING);
+				ExecutionStateChangeEvent e1 = new ExecutionStateChangeEvent(429345231796596L, jid, 17, eid, ExecutionState.CANCELING);
+				ExecutionStateChangeEvent e2 = new ExecutionStateChangeEvent(429345231796596L, jid, 17, eid, ExecutionState.CANCELING);
 				
 				assertTrue(e1.equals(e2));
 				assertEquals(e1.hashCode(), e2.hashCode());
@@ -89,11 +89,11 @@ public class EventsTest {
 				JobVertexID jid = new JobVertexID();
 				ExecutionAttemptID eid = new ExecutionAttemptID();
 				
-				VertexEvent e1 = new VertexEvent(64619276017401234L, jid, "peter", 44, 13, eid, ExecutionState2.DEPLOYING, "foo");
-				VertexEvent e2 = new VertexEvent(64619276017401234L, jid, "peter", 44, 13, eid, ExecutionState2.DEPLOYING, "foo");
+				VertexEvent e1 = new VertexEvent(64619276017401234L, jid, "peter", 44, 13, eid, ExecutionState.DEPLOYING, "foo");
+				VertexEvent e2 = new VertexEvent(64619276017401234L, jid, "peter", 44, 13, eid, ExecutionState.DEPLOYING, "foo");
 				
-				VertexEvent e3 = new VertexEvent(64619276017401234L, jid, null, 44, 13, eid, ExecutionState2.DEPLOYING, null);
-				VertexEvent e4 = new VertexEvent(64619276017401234L, jid, null, 44, 13, eid, ExecutionState2.DEPLOYING, null);
+				VertexEvent e3 = new VertexEvent(64619276017401234L, jid, null, 44, 13, eid, ExecutionState.DEPLOYING, null);
+				VertexEvent e4 = new VertexEvent(64619276017401234L, jid, null, 44, 13, eid, ExecutionState.DEPLOYING, null);
 				
 				assertTrue(e1.equals(e2));
 				assertTrue(e3.equals(e4));
@@ -119,7 +119,7 @@ public class EventsTest {
 			JobVertexID vid = new JobVertexID();
 			ExecutionAttemptID eid = new ExecutionAttemptID();
 			
-			ExecutionStateChangeEvent esce = new ExecutionStateChangeEvent(429345231796596L, vid, 17, eid, ExecutionState2.CANCELING);
+			ExecutionStateChangeEvent esce = new ExecutionStateChangeEvent(429345231796596L, vid, 17, eid, ExecutionState.CANCELING);
 
 			JobEvent je1 = new JobEvent(429345231796596L, JobStatus.CANCELED, "testmessage");
 			JobEvent je2 = new JobEvent(237217579123412431L, JobStatus.RUNNING, null);
@@ -127,8 +127,8 @@ public class EventsTest {
 			RecentJobEvent rce1 = new RecentJobEvent(jid, "some name", JobStatus.FAILED, false, 634563546, 546734734672572457L);
 			RecentJobEvent rce2 = new RecentJobEvent(jid, null, JobStatus.RUNNING, false, 42364716239476L, 127843618273607L);
 
-			VertexEvent ve1 = new VertexEvent(64619276017401234L, vid, "peter", 44, 13, eid, ExecutionState2.DEPLOYING, "foo");
-			VertexEvent ve2 = new VertexEvent(64619276017401234L, vid, null, 44, 13, eid, ExecutionState2.DEPLOYING, null);
+			VertexEvent ve1 = new VertexEvent(64619276017401234L, vid, "peter", 44, 13, eid, ExecutionState.DEPLOYING, "foo");
+			VertexEvent ve2 = new VertexEvent(64619276017401234L, vid, null, 44, 13, eid, ExecutionState.DEPLOYING, null);
 			
 			assertEquals(esce, CommonTestUtils.createCopyWritable(esce));
 			assertEquals(je1, CommonTestUtils.createCopyWritable(je1));

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/TaskEventTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/TaskEventTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/TaskEventTest.java
index 3dd15f6..2095365 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/TaskEventTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/TaskEventTest.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.event.task;
 
 import static org.junit.Assert.assertEquals;
@@ -27,7 +26,6 @@ import java.io.IOException;
 import java.util.Iterator;
 
 import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
index b6f532e..642af31 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
@@ -20,14 +20,12 @@ package org.apache.flink.runtime.executiongraph;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-import org.junit.BeforeClass;
 import org.junit.Test;
-
 import org.mockito.Matchers;
 
 import java.util.ArrayList;
@@ -40,6 +38,7 @@ import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.core.io.InputSplitSource;
 import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
@@ -47,19 +46,12 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.util.LogUtils;
 
 /**
  * This class contains test concerning the correct conversion from {@link JobGraph} to {@link ExecutionGraph} objects.
  */
 public class ExecutionGraphConstructionTest {
 	
-	@BeforeClass
-	public static void setLogLevel() {
-		LogUtils.initializeDefaultTestConsoleLogger();
-	}
-
-	
 	/**
 	 * Creates a JobGraph of the following form:
 	 * 
@@ -260,7 +252,7 @@ public class ExecutionGraphConstructionTest {
 			assertEquals(v1.getParallelism(), e1.getTaskVertices().length);
 			
 			int num = 0;
-			for (ExecutionVertex2 ev : e1.getTaskVertices()) {
+			for (ExecutionVertex ev : e1.getTaskVertices()) {
 				assertEquals(jobId, ev.getJobId());
 				assertEquals(v1.getID(), ev.getJobvertexId());
 				
@@ -268,6 +260,8 @@ public class ExecutionGraphConstructionTest {
 				assertEquals(num++, ev.getParallelSubtaskIndex());
 				
 				assertEquals(0, ev.getNumberOfInputs());
+				
+				assertTrue(ev.getStateTimestamp(ExecutionState.CREATED) > 0);
 			}
 		}
 		
@@ -285,7 +279,7 @@ public class ExecutionGraphConstructionTest {
 			assertEquals(v2.getParallelism(), e2.getTaskVertices().length);
 			
 			int num = 0;
-			for (ExecutionVertex2 ev : e2.getTaskVertices()) {
+			for (ExecutionVertex ev : e2.getTaskVertices()) {
 				assertEquals(jobId, ev.getJobId());
 				assertEquals(v2.getID(), ev.getJobvertexId());
 				
@@ -293,11 +287,11 @@ public class ExecutionGraphConstructionTest {
 				assertEquals(num++, ev.getParallelSubtaskIndex());
 				
 				assertEquals(1, ev.getNumberOfInputs());
-				ExecutionEdge2[] inputs = ev.getInputEdges(0);
+				ExecutionEdge[] inputs = ev.getInputEdges(0);
 				assertEquals(v1.getParallelism(), inputs.length);
 				
 				int sumOfPartitions = 0;
-				for (ExecutionEdge2 inEdge : inputs) {
+				for (ExecutionEdge inEdge : inputs) {
 					assertEquals(0,inEdge.getInputNum());
 					sumOfPartitions += inEdge.getSource().getPartition();
 				}
@@ -322,7 +316,7 @@ public class ExecutionGraphConstructionTest {
 			assertEquals(v3.getParallelism(), e3.getTaskVertices().length);
 			
 			int num = 0;
-			for (ExecutionVertex2 ev : e3.getTaskVertices()) {
+			for (ExecutionVertex ev : e3.getTaskVertices()) {
 				assertEquals(jobId, ev.getJobId());
 				assertEquals(v3.getID(), ev.getJobvertexId());
 				
@@ -346,7 +340,7 @@ public class ExecutionGraphConstructionTest {
 			assertEquals(v4.getParallelism(), e4.getTaskVertices().length);
 			
 			int num = 0;
-			for (ExecutionVertex2 ev : e4.getTaskVertices()) {
+			for (ExecutionVertex ev : e4.getTaskVertices()) {
 				assertEquals(jobId, ev.getJobId());
 				assertEquals(v4.getID(), ev.getJobvertexId());
 				
@@ -356,11 +350,11 @@ public class ExecutionGraphConstructionTest {
 				assertEquals(2, ev.getNumberOfInputs());
 				// first input
 				{
-					ExecutionEdge2[] inputs = ev.getInputEdges(0);
+					ExecutionEdge[] inputs = ev.getInputEdges(0);
 					assertEquals(v2.getParallelism(), inputs.length);
 					
 					int sumOfPartitions = 0;
-					for (ExecutionEdge2 inEdge : inputs) {
+					for (ExecutionEdge inEdge : inputs) {
 						assertEquals(0, inEdge.getInputNum());
 						sumOfPartitions += inEdge.getSource().getPartition();
 					}
@@ -369,11 +363,11 @@ public class ExecutionGraphConstructionTest {
 				}
 				// second input
 				{
-					ExecutionEdge2[] inputs = ev.getInputEdges(1);
+					ExecutionEdge[] inputs = ev.getInputEdges(1);
 					assertEquals(v3.getParallelism(), inputs.length);
 					
 					int sumOfPartitions = 0;
-					for (ExecutionEdge2 inEdge : inputs) {
+					for (ExecutionEdge inEdge : inputs) {
 						assertEquals(1, inEdge.getInputNum());
 						sumOfPartitions += inEdge.getSource().getPartition();
 					}
@@ -395,7 +389,7 @@ public class ExecutionGraphConstructionTest {
 			assertEquals(v5.getParallelism(), e5.getTaskVertices().length);
 			
 			int num = 0;
-			for (ExecutionVertex2 ev : e5.getTaskVertices()) {
+			for (ExecutionVertex ev : e5.getTaskVertices()) {
 				assertEquals(jobId, ev.getJobId());
 				assertEquals(v5.getID(), ev.getJobvertexId());
 				
@@ -405,11 +399,11 @@ public class ExecutionGraphConstructionTest {
 				assertEquals(2, ev.getNumberOfInputs());
 				// first input
 				{
-					ExecutionEdge2[] inputs = ev.getInputEdges(0);
+					ExecutionEdge[] inputs = ev.getInputEdges(0);
 					assertEquals(v4.getParallelism(), inputs.length);
 					
 					int sumOfPartitions = 0;
-					for (ExecutionEdge2 inEdge : inputs) {
+					for (ExecutionEdge inEdge : inputs) {
 						assertEquals(0, inEdge.getInputNum());
 						sumOfPartitions += inEdge.getSource().getPartition();
 					}
@@ -418,11 +412,11 @@ public class ExecutionGraphConstructionTest {
 				}
 				// second input
 				{
-					ExecutionEdge2[] inputs = ev.getInputEdges(1);
+					ExecutionEdge[] inputs = ev.getInputEdges(1);
 					assertEquals(v3.getParallelism(), inputs.length);
 					
 					int sumOfPartitions = 0;
-					for (ExecutionEdge2 inEdge : inputs) {
+					for (ExecutionEdge inEdge : inputs) {
 						assertEquals(1, inEdge.getInputNum());
 						sumOfPartitions += inEdge.getSource().getPartition();
 					}
@@ -567,4 +561,41 @@ public class ExecutionGraphConstructionTest {
 			fail(e.getMessage());
 		}
 	}
+	
+	@Test
+	public void testMoreThanOneConsumerForIntermediateResult() {
+		try {
+			final JobID jobId = new JobID();
+			final String jobName = "Test Job Sample Name";
+			final Configuration cfg = new Configuration();
+			
+			AbstractJobVertex v1 = new AbstractJobVertex("vertex1");
+			AbstractJobVertex v2 = new AbstractJobVertex("vertex2");
+			AbstractJobVertex v3 = new AbstractJobVertex("vertex3");
+			
+			v1.setParallelism(5);
+			v2.setParallelism(7);
+			v3.setParallelism(2);
+
+			IntermediateDataSet result = v1.createAndAddResultDataSet();
+			v2.connectDataSetAsInput(result, DistributionPattern.BIPARTITE);
+			v3.connectDataSetAsInput(result, DistributionPattern.BIPARTITE);
+			
+			List<AbstractJobVertex> ordered = new ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2, v3));
+
+			ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg);
+
+			try {
+				eg.attachJobGraph(ordered);
+				fail("Should not be possible");
+			}
+			catch (RuntimeException e) {
+				// expected
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 9705dcd..ac76623 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -19,11 +19,9 @@
 package org.apache.flink.runtime.executiongraph;
 
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
-
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.spy;
@@ -31,27 +29,32 @@ import static org.mockito.Mockito.doAnswer;
 
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
-import org.apache.flink.runtime.execution.ExecutionState2;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.DefaultScheduler;
 import org.apache.flink.runtime.operators.RegularPactTask;
 import org.apache.flink.runtime.protocols.TaskOperationProtocol;
 import org.apache.flink.runtime.taskmanager.TaskOperationResult;
+
 import org.junit.Test;
+
 import org.mockito.Matchers;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 public class ExecutionGraphDeploymentTest {
-
+	
 	@Test
 	public void testBuildDeploymentDescriptor() {
 		try {
@@ -97,7 +100,7 @@ public class ExecutionGraphDeploymentTest {
 			eg.attachJobGraph(ordered);
 			
 			ExecutionJobVertex ejv = eg.getAllVertices().get(jid2);
-			ExecutionVertex2 vertex = ejv.getTaskVertices()[3];
+			ExecutionVertex vertex = ejv.getTaskVertices()[3];
 			
 			// just some reference (needs not be atomic)
 			final AtomicReference<TaskDeploymentDescriptor> reference = new AtomicReference<TaskDeploymentDescriptor>();
@@ -107,18 +110,21 @@ public class ExecutionGraphDeploymentTest {
 			when(taskManager.submitTask(Matchers.any(TaskDeploymentDescriptor.class))).thenAnswer(new Answer<TaskOperationResult>() {
 				@Override
 				public TaskOperationResult answer(InvocationOnMock invocation) {
-					final TaskDeploymentDescriptor parameter = (TaskDeploymentDescriptor) invocation.getArguments()[0];
-					reference.set(parameter);
-					return new TaskOperationResult(jid2, 0, true);
+					final TaskDeploymentDescriptor tdd = (TaskDeploymentDescriptor) invocation.getArguments()[0];
+					reference.set(tdd);
+					return new TaskOperationResult(tdd.getExecutionId(), true);
 				}
 			});
 			
 			final Instance instance = getInstance(taskManager);
 			final AllocatedSlot slot = instance.allocateSlot(jobId);
 			
-			assertEquals(ExecutionState2.CREATED, vertex.getExecutionState());
+			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
+			
+			LibraryCacheManager.register(jobId, new String[0]);
 			vertex.deployToSlot(slot);
-			assertEquals(ExecutionState2.RUNNING, vertex.getExecutionState());
+			
+			assertEquals(ExecutionState.RUNNING, vertex.getExecutionState());
 			
 			TaskDeploymentDescriptor descr = reference.get();
 			assertNotNull(descr);
@@ -142,4 +148,128 @@ public class ExecutionGraphDeploymentTest {
 			fail(e.getMessage());
 		}
 	}
+	
+	@Test
+	public void testRegistrationOfExecutionsFinishing() {
+		try {
+			Map<ExecutionAttemptID, Execution> executions = setupExecution(7650, 2350);
+			
+			for (Execution e : executions.values()) {
+				e.markFinished();
+			}
+			
+			assertEquals(0, executions.size());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testRegistrationOfExecutionsFailing() {
+		try {
+			Map<ExecutionAttemptID, Execution> executions = setupExecution(7, 6);
+			
+			for (Execution e : executions.values()) {
+				e.markFailed(null);
+			}
+			
+			assertEquals(0, executions.size());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testRegistrationOfExecutionsFailedExternally() {
+		try {
+			Map<ExecutionAttemptID, Execution> executions = setupExecution(7, 6);
+			
+			for (Execution e : executions.values()) {
+				e.fail(null);
+			}
+			
+			assertEquals(0, executions.size());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testRegistrationOfExecutionsCanceled() {
+		try {
+			Map<ExecutionAttemptID, Execution> executions = setupExecution(19, 37);
+			
+			for (Execution e : executions.values()) {
+				e.cancel();
+				e.cancelingComplete();
+			}
+			
+			assertEquals(0, executions.size());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	private Map<ExecutionAttemptID, Execution> setupExecution(int dop1, int dop2) throws Exception {
+		final JobID jobId = new JobID();
+		
+		final JobVertexID jid1 = new JobVertexID();
+		final JobVertexID jid2 = new JobVertexID();
+		
+		AbstractJobVertex v1 = new AbstractJobVertex("v1", jid1);
+		AbstractJobVertex v2 = new AbstractJobVertex("v2", jid2);
+		
+		v1.setParallelism(dop1);
+		v2.setParallelism(dop2);
+		
+		v1.setInvokableClass(RegularPactTask.class);
+		v2.setInvokableClass(RegularPactTask.class);
+		
+		// execution graph that executes actions synchronously
+		ExecutionGraph eg = new ExecutionGraph(jobId, "some job", new Configuration());
+		eg.setQueuedSchedulingAllowed(false);
+		
+		List<AbstractJobVertex> ordered = Arrays.asList(v1, v2);
+		eg.attachJobGraph(ordered);
+		
+		// create a mock taskmanager that accepts deployment calls
+		TaskOperationProtocol taskManager = mock(TaskOperationProtocol.class);
+		when(taskManager.submitTask(Matchers.any(TaskDeploymentDescriptor.class))).thenAnswer(new Answer<TaskOperationResult>() {
+			@Override
+			public TaskOperationResult answer(InvocationOnMock invocation) {
+				final TaskDeploymentDescriptor tdd = (TaskDeploymentDescriptor) invocation.getArguments()[0];
+				return new TaskOperationResult(tdd.getExecutionId(), true);
+			}
+		});
+		when(taskManager.cancelTask(Matchers.any(ExecutionAttemptID.class))).thenAnswer(new Answer<TaskOperationResult>() {
+			@Override
+			public TaskOperationResult answer(InvocationOnMock invocation) {
+				final ExecutionAttemptID id = (ExecutionAttemptID) invocation.getArguments()[0];
+				return new TaskOperationResult(id, true);
+			}
+		});
+		
+		DefaultScheduler scheduler = new DefaultScheduler();
+		for (int i = 0; i < dop1 + dop2; i++) {
+			scheduler.newInstanceAvailable(getInstance(taskManager));
+		}
+		assertEquals(dop1 + dop2, scheduler.getNumberOfAvailableSlots());
+		
+		// schedule, this triggers mock deployment
+		LibraryCacheManager.register(jobId, new String[0]);
+		eg.scheduleForExecution(scheduler);
+		
+		Map<ExecutionAttemptID, Execution> executions = eg.getRegisteredExecutions();
+		assertEquals(dop1 + dop2, executions.size());
+		
+		return executions;
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 2207475..39c5678 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.executiongraph;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+import static org.mockito.Matchers.any;
 
 import java.lang.reflect.Field;
 import java.net.InetAddress;
@@ -28,7 +30,8 @@ import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.JobException;
-import org.apache.flink.runtime.execution.ExecutionState2;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.instance.Instance;
@@ -36,9 +39,11 @@ import org.apache.flink.runtime.instance.InstanceConnectionInfo;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
 import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.protocols.TaskOperationProtocol;
+import org.apache.flink.runtime.taskmanager.TaskOperationResult;
 import org.mockito.Matchers;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -49,28 +54,43 @@ public class ExecutionGraphTestUtils {
 	//  state modifications
 	// --------------------------------------------------------------------------------------------
 	
-	public static void setVertexState(ExecutionVertex2 vertex, ExecutionState2 state) {
+	public static void setVertexState(ExecutionVertex vertex, ExecutionState state) {
 		try {
-			Field f = ExecutionVertex2.class.getDeclaredField("state");
+			Execution exec = vertex.getCurrentExecutionAttempt();
+			
+			Field f = Execution.class.getDeclaredField("state");
 			f.setAccessible(true);
-			f.set(vertex, state);
+			f.set(exec, state);
 		}
 		catch (Exception e) {
 			throw new RuntimeException("Modifying the state failed", e);
 		}
 	}
 	
-	public static void setVertexResource(ExecutionVertex2 vertex, AllocatedSlot slot) {
+	public static void setVertexResource(ExecutionVertex vertex, AllocatedSlot slot) {
 		try {
-			Field f = ExecutionVertex2.class.getDeclaredField("assignedSlot");
+			Execution exec = vertex.getCurrentExecutionAttempt();
+			
+			Field f = Execution.class.getDeclaredField("assignedResource");
 			f.setAccessible(true);
-			f.set(vertex, slot);
+			f.set(exec, slot);
 		}
 		catch (Exception e) {
 			throw new RuntimeException("Modifying the slot failed", e);
 		}
 	}
 	
+	public static void setGraphStatus(ExecutionGraph graph, JobStatus status) {
+		try {
+			Field f = ExecutionGraph.class.getDeclaredField("state");
+			f.setAccessible(true);
+			f.set(graph, status);
+		}
+		catch (Exception e) {
+			throw new RuntimeException("Modifying the status failed", e);
+		}
+	}
+	
 	// --------------------------------------------------------------------------------------------
 	//  utility mocking methods
 	// --------------------------------------------------------------------------------------------
@@ -88,6 +108,28 @@ public class ExecutionGraphTestUtils {
 		};
 	}
 	
+	public static TaskOperationProtocol getSimpleAcknowledgingTaskmanager() throws Exception {
+		TaskOperationProtocol top = mock(TaskOperationProtocol.class);
+		
+		when(top.submitTask(any(TaskDeploymentDescriptor.class))).thenAnswer(new Answer<TaskOperationResult>() {
+			@Override
+			public TaskOperationResult answer(InvocationOnMock invocation) {
+				final TaskDeploymentDescriptor tdd = (TaskDeploymentDescriptor) invocation.getArguments()[0];
+				return new TaskOperationResult(tdd.getExecutionId(), true);
+			}
+		});
+		
+		when(top.cancelTask(Matchers.any(ExecutionAttemptID.class))).thenAnswer(new Answer<TaskOperationResult>() {
+			@Override
+			public TaskOperationResult answer(InvocationOnMock invocation) {
+				final ExecutionAttemptID id = (ExecutionAttemptID) invocation.getArguments()[0];
+				return new TaskOperationResult(id, true);
+			}
+		});
+		
+		return top;
+	}
+	
 	public static ExecutionJobVertex getJobVertexNotExecuting(JobVertexID id) throws JobException {
 		ExecutionJobVertex ejv = getJobVertexBase(id);
 		
@@ -153,7 +195,20 @@ public class ExecutionGraphTestUtils {
 		
 		ExecutionGraph graph = new ExecutionGraph(new JobID(), "test job", new Configuration());
 		
-		return spy(new ExecutionJobVertex(graph, ajv, 1));
+		ExecutionJobVertex ejv = spy(new ExecutionJobVertex(graph, ajv, 1));
+		
+		Answer<Void> noop = new Answer<Void>() {
+			@Override
+			public Void answer(InvocationOnMock invocation) {
+				return null;
+			}
+		};
+		
+		doAnswer(noop).when(ejv).vertexCancelled(Matchers.anyInt());
+		doAnswer(noop).when(ejv).vertexFailed(Matchers.anyInt(), Matchers.any(Throwable.class));
+		doAnswer(noop).when(ejv).vertexFinished(Matchers.anyInt());
+		
+		return ejv;
 	}
 	
 	// --------------------------------------------------------------------------------------------


Mime
View raw message