flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [40/63] [abbrv] git commit: Finalize ExecutionGraph state machine and calls
Date Sun, 21 Sep 2014 02:13:04 GMT
Finalize ExecutionGraph state machine and calls


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/ae139f5a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/ae139f5a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/ae139f5a

Branch: refs/heads/master
Commit: ae139f5ae2199a52e8d7f561f94db51631107d00
Parents: 43e7d0f
Author: Stephan Ewen <sewen@apache.org>
Authored: Thu Sep 11 07:18:56 2014 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Sat Sep 20 20:02:49 2014 +0200

----------------------------------------------------------------------
 .../org/apache/flink/util/ExceptionUtils.java   |   24 +-
 .../deployment/ChannelDeploymentDescriptor.java |    4 +-
 .../deployment/GateDeploymentDescriptor.java    |   10 +-
 .../event/job/ExecutionStateChangeEvent.java    |   12 +-
 .../flink/runtime/event/job/VertexEvent.java    |   12 +-
 .../runtime/execution/ExecutionAttempt.java     |  100 --
 .../runtime/execution/ExecutionListener.java    |    2 +-
 .../runtime/execution/ExecutionObserver.java    |    2 +-
 .../flink/runtime/execution/ExecutionState.java |   60 +-
 .../runtime/execution/ExecutionState2.java      |   38 -
 .../librarycache/LibraryCacheManager.java       |   12 +-
 .../executiongraph/AllVerticesIterator.java     |    6 +-
 .../flink/runtime/executiongraph/Execution.java |  606 ++++++++
 .../executiongraph/ExecutionAttempt.java        |  111 --
 .../runtime/executiongraph/ExecutionEdge.java   |   74 +
 .../runtime/executiongraph/ExecutionEdge2.java  |   74 -
 .../runtime/executiongraph/ExecutionGraph.java  | 1387 +++---------------
 .../executiongraph/ExecutionJobVertex.java      |   95 +-
 .../runtime/executiongraph/ExecutionState.java  |   34 -
 .../runtime/executiongraph/ExecutionVertex.java | 1230 ++++------------
 .../executiongraph/ExecutionVertex2.java        |  710 ---------
 .../IntermediateResultPartition.java            |   22 +-
 .../flink/runtime/instance/AllocatedSlot.java   |   27 +-
 .../apache/flink/runtime/instance/Instance.java |   26 +-
 .../instance/InstanceConnectionInfo.java        |   13 +-
 .../runtime/io/network/ChannelManager.java      |    4 +
 .../runtime/io/network/RemoteReceiver.java      |   20 +-
 .../runtime/io/network/api/MutableReader.java   |   10 -
 .../concurrent/SolutionSetUpdateBarrier.java    |    4 +-
 .../SolutionSetUpdateBarrierBroker.java         |    6 +-
 .../runtime/jobmanager/EventCollector.java      |    4 +-
 .../flink/runtime/jobmanager/JobManager.java    |   10 +-
 .../jobmanager/scheduler/DefaultScheduler.java  |   23 +-
 .../jobmanager/scheduler/ScheduledUnit.java     |   22 +-
 .../scheduler/SlotSharingGroupAssignment.java   |    9 +-
 .../jobmanager/web/JobmanagerInfoServlet.java   |   61 +-
 .../runtime/jobmanager/web/JsonFactory.java     |   18 +-
 .../hash/AbstractMutableHashTable.java          |    9 +-
 .../operators/sort/UnilateralSortMerger.java    |   23 +-
 .../profiling/impl/EnvironmentListenerImpl.java |    4 +-
 .../profiling/impl/JobProfilingData.java        |   10 +-
 .../runtime/protocols/JobManagerProtocol.java   |   13 +-
 .../protocols/TaskOperationProtocol.java        |    3 +-
 .../apache/flink/runtime/taskmanager/Task.java  |   61 +-
 .../runtime/taskmanager/TaskExecutionState.java |   88 +-
 .../flink/runtime/taskmanager/TaskManager.java  |   25 +-
 .../taskmanager/TaskOperationResult.java        |   62 +-
 .../flink/runtime/client/JobResultTest.java     |    1 -
 .../flink/runtime/event/job/EventsTest.java     |   20 +-
 .../flink/runtime/event/task/TaskEventTest.java |    2 -
 .../ExecutionGraphConstructionTest.java         |   81 +-
 .../ExecutionGraphDeploymentTest.java           |  150 +-
 .../executiongraph/ExecutionGraphTestUtils.java |   71 +-
 .../ExecutionStateProgressTest.java             |   79 +
 .../ExecutionVertexCancelTest.java              |  221 ++-
 .../ExecutionVertexDeploymentTest.java          |  203 ++-
 .../ExecutionVertexSchedulingTest.java          |  141 ++
 .../executiongraph/PointwisePatternTest.java    |   30 +-
 .../apache/flink/runtime/fs/LineReaderTest.java |    2 -
 .../runtime/instance/AllocatedSlotTest.java     |    8 +-
 .../instance/LocalInstanceManagerTest.java      |    5 +-
 .../io/disk/iomanager/IOManagerITCase.java      |    6 +-
 .../runtime/jobmanager/JobManagerITCase.java    |    5 +-
 .../runtime/jobmanager/JobManagerTest.java      |   23 -
 .../scheduler/SchedulerIsolatedTasksTest.java   |   32 +-
 .../scheduler/SchedulerTestUtils.java           |   50 +-
 .../runtime/operators/hash/HashTableITCase.java |    3 +-
 .../CombiningUnilateralSortMergerITCase.java    |    3 +-
 .../taskmanager/TaskExecutionStateTest.java     |   33 +-
 .../runtime/taskmanager/TaskManagerTest.java    |  139 +-
 .../flink/runtime/taskmanager/TaskTest.java     |   50 +-
 71 files changed, 2681 insertions(+), 3857 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index fd4b375..047b9af 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -30,17 +30,29 @@ import java.io.StringWriter;
 public class ExceptionUtils {
 
 	/**
-	 * Makes a string representation of the exception's stack trace.
+	 * Makes a string representation of the exception's stack trace, or "(null)", if the
+	 * exception is null.
+	 * 
+	 * This method makes a best effort and never fails.
 	 * 
 	 * @param e The exception to stringify.
 	 * @return A string with exception name and call stack.
 	 */
 	public static String stringifyException(final Throwable e) {
-		final StringWriter stm = new StringWriter();
-		final PrintWriter wrt = new PrintWriter(stm);
-		e.printStackTrace(wrt);
-		wrt.close();
-		return stm.toString();
+		if (e == null) {
+			return "(null)";
+		}
+		
+		try {
+			StringWriter stm = new StringWriter();
+			PrintWriter wrt = new PrintWriter(stm);
+			e.printStackTrace(wrt);
+			wrt.close();
+			return stm.toString();
+		}
+		catch (Throwable t) {
+			return e.getClass().getName() + " (error while printing stack trace)";
+		}
 	}
 	
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ChannelDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ChannelDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ChannelDeploymentDescriptor.java
index b4a38f2..6cd18c3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ChannelDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ChannelDeploymentDescriptor.java
@@ -23,7 +23,7 @@ import java.io.IOException;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.executiongraph.ExecutionEdge2;
+import org.apache.flink.runtime.executiongraph.ExecutionEdge;
 import org.apache.flink.runtime.io.network.channels.ChannelID;
 
 /**
@@ -95,7 +95,7 @@ public final class ChannelDeploymentDescriptor implements IOReadableWritable {
 	
 	// --------------------------------------------------------------------------------------------
 	
-	public static ChannelDeploymentDescriptor fromExecutionEdge(ExecutionEdge2 edge) {
+	public static ChannelDeploymentDescriptor fromExecutionEdge(ExecutionEdge edge) {
 		return new ChannelDeploymentDescriptor(edge.getOutputChannelId(), edge.getInputChannelId());
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/GateDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/GateDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/GateDeploymentDescriptor.java
index e4a447f..71a19d3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/GateDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/GateDeploymentDescriptor.java
@@ -25,7 +25,7 @@ import java.util.List;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.executiongraph.ExecutionEdge2;
+import org.apache.flink.runtime.executiongraph.ExecutionEdge;
 
 /**
  * A gate deployment descriptor contains the deployment descriptors for the channels associated with that gate.
@@ -83,17 +83,17 @@ public final class GateDeploymentDescriptor implements IOReadableWritable {
 	
 	// --------------------------------------------------------------------------------------------
 	
-	public static GateDeploymentDescriptor fromEdges(List<ExecutionEdge2> edges) {
+	public static GateDeploymentDescriptor fromEdges(List<ExecutionEdge> edges) {
 		List<ChannelDeploymentDescriptor> channels = new ArrayList<ChannelDeploymentDescriptor>(edges.size());
-		for (ExecutionEdge2 edge : edges) {
+		for (ExecutionEdge edge : edges) {
 			channels.add(ChannelDeploymentDescriptor.fromExecutionEdge(edge));
 		}
 		return new GateDeploymentDescriptor(channels);
 	}
 	
-	public static GateDeploymentDescriptor fromEdges(ExecutionEdge2[] edges) {
+	public static GateDeploymentDescriptor fromEdges(ExecutionEdge[] edges) {
 		List<ChannelDeploymentDescriptor> channels = new ArrayList<ChannelDeploymentDescriptor>(edges.length);
-		for (ExecutionEdge2 edge : edges) {
+		for (ExecutionEdge edge : edges) {
 			channels.add(ChannelDeploymentDescriptor.fromExecutionEdge(edge));
 		}
 		return new GateDeploymentDescriptor(channels);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/ExecutionStateChangeEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/ExecutionStateChangeEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/ExecutionStateChangeEvent.java
index f233d49..15d2fe6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/ExecutionStateChangeEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/ExecutionStateChangeEvent.java
@@ -22,7 +22,7 @@ import java.io.IOException;
 
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.execution.ExecutionState2;
+import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
@@ -41,7 +41,7 @@ public final class ExecutionStateChangeEvent extends AbstractEvent implements Ma
 	
 	private ExecutionAttemptID executionAttemptId;
 
-	private ExecutionState2 newExecutionState;
+	private ExecutionState newExecutionState;
 
 	/**
 	 * Constructs a new vertex event object.
@@ -54,7 +54,7 @@ public final class ExecutionStateChangeEvent extends AbstractEvent implements Ma
 	 *        the new execution state of the vertex this event refers to
 	 */
 	public ExecutionStateChangeEvent(long timestamp, JobVertexID vertexId, int subtask,
-			ExecutionAttemptID executionAttemptId, ExecutionState2 newExecutionState)
+			ExecutionAttemptID executionAttemptId, ExecutionState newExecutionState)
 	{
 		super(timestamp);
 		
@@ -79,7 +79,7 @@ public final class ExecutionStateChangeEvent extends AbstractEvent implements Ma
 
 		this.vertexId = new JobVertexID();
 		this.executionAttemptId = new ExecutionAttemptID();
-		this.newExecutionState = ExecutionState2.CREATED;
+		this.newExecutionState = ExecutionState.CREATED;
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -106,7 +106,7 @@ public final class ExecutionStateChangeEvent extends AbstractEvent implements Ma
 	 * 
 	 * @return the new execution state of the vertex this event refers to
 	 */
-	public ExecutionState2 getNewExecutionState() {
+	public ExecutionState getNewExecutionState() {
 		return this.newExecutionState;
 	}
 
@@ -118,7 +118,7 @@ public final class ExecutionStateChangeEvent extends AbstractEvent implements Ma
 		this.vertexId.read(in);
 		this.executionAttemptId.read(in);
 		this.subtask = in.readInt();
-		this.newExecutionState = ExecutionState2.values()[in.readInt()];
+		this.newExecutionState = ExecutionState.values()[in.readInt()];
 	}
 
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/VertexEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/VertexEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/VertexEvent.java
index c697aa4..a935ba9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/VertexEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/VertexEvent.java
@@ -22,7 +22,7 @@ import java.io.IOException;
 
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.execution.ExecutionState2;
+import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.util.StringUtils;
@@ -53,7 +53,7 @@ public class VertexEvent extends AbstractEvent {
 	private ExecutionAttemptID executionAttemptId;
 
 	/** The current execution state of the subtask this event belongs to. */
-	private ExecutionState2 currentExecutionState;
+	private ExecutionState currentExecutionState;
 
 	/** An optional more detailed description of the event. */
 	private String description;
@@ -78,7 +78,7 @@ public class VertexEvent extends AbstractEvent {
 	 */
 	public VertexEvent(long timestamp, JobVertexID jobVertexID, String jobVertexName,
 			int totalNumberOfSubtasks, int indexOfSubtask, ExecutionAttemptID executionAttemptId,
-			ExecutionState2 currentExecutionState, String description)
+			ExecutionState currentExecutionState, String description)
 	{
 		super(timestamp);
 		
@@ -106,7 +106,7 @@ public class VertexEvent extends AbstractEvent {
 		this.totalNumberOfSubtasks = -1;
 		this.indexOfSubtask = -1;
 		this.executionAttemptId = new ExecutionAttemptID();
-		this.currentExecutionState = ExecutionState2.CREATED;
+		this.currentExecutionState = ExecutionState.CREATED;
 	}
 
 	/**
@@ -152,7 +152,7 @@ public class VertexEvent extends AbstractEvent {
 	 * 
 	 * @return the current execution state of the subtask this event belongs to
 	 */
-	public ExecutionState2 getCurrentExecutionState() {
+	public ExecutionState getCurrentExecutionState() {
 		return currentExecutionState;
 	}
 
@@ -181,7 +181,7 @@ public class VertexEvent extends AbstractEvent {
 		this.executionAttemptId.read(in);
 		this.totalNumberOfSubtasks = in.readInt();
 		this.indexOfSubtask = in.readInt();
-		this.currentExecutionState = ExecutionState2.values()[in.readInt()];
+		this.currentExecutionState = ExecutionState.values()[in.readInt()];
 		this.jobVertexName = StringUtils.readNullableString(in);
 		this.description = StringUtils.readNullableString(in);
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionAttempt.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionAttempt.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionAttempt.java
deleted file mode 100644
index 9b39851..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionAttempt.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.execution;
-
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex2;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-
-/**
- * An attempt to execute a task for a {@link ExecutionVertex2}.
- */
-public class ExecutionAttempt implements java.io.Serializable {
-	
-	private static final long serialVersionUID = 1L;
-	
-
-	private final JobVertexID vertexId;
-	
-	private final int subtaskIndex;
-	
-	private final ExecutionAttemptID executionId;
-	
-	private final int attempt;
-
-	// --------------------------------------------------------------------------------------------
-	
-	public ExecutionAttempt(JobVertexID vertexId, int subtaskIndex, ExecutionAttemptID executionId, int attempt) {
-		if (vertexId == null || executionId == null || subtaskIndex < 0 || attempt < 1) {
-			throw new IllegalArgumentException();
-		}
-		
-		this.vertexId = vertexId;
-		this.subtaskIndex = subtaskIndex;
-		this.executionId = executionId;
-		this.attempt = attempt;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	public JobVertexID getVertexId() {
-		return vertexId;
-	}
-	
-	public int getSubtaskIndex() {
-		return subtaskIndex;
-	}
-	
-	public ExecutionAttemptID getExecutionId() {
-		return executionId;
-	}
-	
-	public int getAttempt() {
-		return attempt;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public int hashCode() {
-		return vertexId.hashCode() +
-				executionId.hashCode() +
-				31 * subtaskIndex +
-				17 * attempt;
-	}
-	
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof ExecutionAttempt) {
-			ExecutionAttempt other = (ExecutionAttempt) obj;
-			return this.executionId.equals(other.executionId) &&
-					this.vertexId.equals(other.vertexId) &&
-					this.subtaskIndex == other.subtaskIndex &&
-					this.attempt == other.attempt;
-		} else {
-			return false;
-		}
-	}
-	
-	@Override
-	public String toString() {
-		return String.format("ExecutionAttempt (vertex=%s, subtask=%d, executionAttemptId=%s, attempt=%d)",
-				vertexId, subtaskIndex, executionId, attempt);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionListener.java
index b08c847..74f94cf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionListener.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionListener.java
@@ -29,5 +29,5 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 public interface ExecutionListener {
 
 	void executionStateChanged(JobID jobID, JobVertexID vertexId, int subtask, ExecutionAttemptID executionId,
-			ExecutionState2 newExecutionState, String optionalMessage);
+			ExecutionState newExecutionState, String optionalMessage);
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionObserver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionObserver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionObserver.java
index 20a6180..78ed4d9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionObserver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionObserver.java
@@ -28,7 +28,7 @@ public interface ExecutionObserver {
 	 * @param optionalMessage
 	 *        an optional message providing further information on the state change
 	 */
-	void executionStateChanged(ExecutionState2 newExecutionState, String optionalMessage);
+	void executionStateChanged(ExecutionState newExecutionState, String optionalMessage);
 
 	/**
 	 * Returns whether the task has been canceled.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState.java
index 36a8672..6ad936e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState.java
@@ -16,67 +16,23 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.execution;
 
-/**
- * This enumerations includes all possible states during a task's lifetime.
- * 
- */
 public enum ExecutionState {
 
-	/**
-	 * The task has been created, but is not yet submitted to a scheduler.
-	 */
 	CREATED,
-
-	/**
-	 * The task has been accepted by the scheduler, the resource for the task has been requested
-	 */
+	
 	SCHEDULED,
-
-	/**
-	 * The task has been assigned a resource to run, but is not yet read to by deployed.
-	 */
-	ASSIGNED,
-
-	/**
-	 * The task has been announced ready to run by the scheduler, but is not yet running.
-	 */
-	READY,
-
-	/**
-	 * The task is currently deployed to the assigned to task manager.
-	 */
-	STARTING,
-
-	/**
-	 * The task is currently running.
-	 */
+	
+	DEPLOYING,
+	
 	RUNNING,
-
-	/**
-	 * The task has already finished, but not all of its results have been consumed yet.
-	 */
-	FINISHING,
-
-	/**
-	 * The task finished, all of its results have been consumed.
-	 */
+	
 	FINISHED,
-
-	/**
-	 * The task has been requested to be canceled, but is not yet terminated.
-	 */
+	
 	CANCELING,
-
-	/**
-	 * The task has been canceled due to a user request or the error of a connected task.
-	 */
+	
 	CANCELED,
-
-	/**
-	 * The task has been aborted due to a failure during execution.
-	 */
+	
 	FAILED
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState2.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState2.java
deleted file mode 100644
index c2b2070..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState2.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.execution;
-
-public enum ExecutionState2 {
-
-	CREATED,
-	
-	SCHEDULED,
-	
-	DEPLOYING,
-	
-	RUNNING,
-	
-	FINISHED,
-	
-	CANCELING,
-	
-	CANCELED,
-	
-	FAILED
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
index 1076ede..cbcd368 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
@@ -364,12 +364,14 @@ public final class LibraryCacheManager {
 
 		// Use spin lock here
 		while (this.lockMap.putIfAbsent(id, LOCK_OBJECT) != null);
-
-		if (decrementReferenceCounter(id) == 0) {
-			this.libraryManagerEntries.remove(id);
+		try {
+			if (decrementReferenceCounter(id) == 0) {
+				this.libraryManagerEntries.remove(id);
+			}
+		}
+		finally {
+			this.lockMap.remove(id);
 		}
-
-		this.lockMap.remove(id);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AllVerticesIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AllVerticesIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AllVerticesIterator.java
index 84781cb..4bf36f2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AllVerticesIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AllVerticesIterator.java
@@ -21,11 +21,11 @@ package org.apache.flink.runtime.executiongraph;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
 
-class AllVerticesIterator implements Iterator<ExecutionVertex2> {
+class AllVerticesIterator implements Iterator<ExecutionVertex> {
 
 	private final Iterator<ExecutionJobVertex> jobVertices;
 	
-	private ExecutionVertex2[] currVertices;
+	private ExecutionVertex[] currVertices;
 	
 	private int currPos;
 	
@@ -56,7 +56,7 @@ class AllVerticesIterator implements Iterator<ExecutionVertex2> {
 	}
 	
 	@Override
-	public ExecutionVertex2 next() {
+	public ExecutionVertex next() {
 		if (hasNext()) {
 			return currVertices[currPos++];
 		} else {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
new file mode 100644
index 0000000..8cfc7fd
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -0,0 +1,606 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import static org.apache.flink.runtime.execution.ExecutionState.*;
+
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.instance.AllocatedSlot;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.jobmanager.scheduler.DefaultScheduler;
+import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFutureAction;
+import org.apache.flink.runtime.taskmanager.TaskOperationResult;
+import org.apache.flink.util.ExceptionUtils;
+import org.slf4j.Logger;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A single execution of a vertex. While an {@link ExecutionVertex} can be executed multiple times (for recovery,
+ * or other re-computation), this class tracks the state of a single execution of that vertex and the resources.
+ * 
+ * NOTE ABOUT THE DESIGN RATIONAL:
+ * 
+ * In several points of the code, we need to deal with possible concurrent state changes and actions.
+ * For example, while the call to deploy a task (send it to the TaskManager) happens, the task gets cancelled.
+ * 
+ * We could lock the entire portion of the code (decision to deploy, deploy, set state to running) such that
+ * it is guaranteed that any "cancel command" will only pick up after deployment is done and that the "cancel
+ * command" call will never overtake the deploying call.
+ * 
+ * This blocks the threads big time, because the remote calls may take long. Depending of their locking behavior, it
+ * may even result in distributed deadlocks (unless carefully avoided). We therefore use atomic state updates and
+ * occasional double-checking to ensure that the state after a completed call is as expected, and trigger correcting
+ * actions if it is not. Many actions are also idempotent (like canceling).
+ */
+public class Execution {
+
+	private static final AtomicReferenceFieldUpdater<Execution, ExecutionState> STATE_UPDATER =
+			AtomicReferenceFieldUpdater.newUpdater(Execution.class, ExecutionState.class, "state");
+	
+	private static final Logger LOG = ExecutionGraph.LOG;
+	
+	private static final int NUM_CANCEL_CALL_TRIES = 3;
+	
+	// --------------------------------------------------------------------------------------------
+	
+	private final ExecutionVertex vertex;
+	
+	private final ExecutionAttemptID attemptId;
+	
+	private final long[] stateTimestamps;
+	
+	private final int attemptNumber;
+	
+	
+	private volatile ExecutionState state = CREATED;
+	
+	private volatile AllocatedSlot assignedResource;  // once assigned, never changes
+	
+	private volatile Throwable failureCause;          // once assigned, never changes
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public Execution(ExecutionVertex vertex, int attemptNumber, long startTimestamp) {
+		Preconditions.checkNotNull(vertex);
+		Preconditions.checkArgument(attemptNumber >= 0);
+		
+		this.vertex = vertex;
+		this.attemptId = new ExecutionAttemptID();
+		this.attemptNumber = attemptNumber;
+		
+		this.stateTimestamps = new long[ExecutionState.values().length];
+		markTimestamp(ExecutionState.CREATED, startTimestamp);
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//   Properties
+	// --------------------------------------------------------------------------------------------
+	
+	public ExecutionVertex getVertex() {
+		return vertex;
+	}
+	
+	public ExecutionAttemptID getAttemptId() {
+		return attemptId;
+	}
+
+	public int getAttemptNumber() {
+		return attemptNumber;
+	}
+	
+	public ExecutionState getState() {
+		return state;
+	}
+	
+	public AllocatedSlot getAssignedResource() {
+		return assignedResource;
+	}
+	
+	public Throwable getFailureCause() {
+		return failureCause;
+	}
+	
+	public long getStateTimestamp(ExecutionState state) {
+		return this.stateTimestamps[state.ordinal()];
+	}
+	
+	public boolean isFinished() {
+		return state == FINISHED || state == FAILED || state == CANCELED;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  Actions
+	// --------------------------------------------------------------------------------------------
+	
+	/**
+	 * NOTE: This method only throws exceptions if it is in an illegal state to be scheduled, or if the tasks needs
+	 *       to be scheduled immediately and no resource is available. If the task is accepted by the schedule, any
+	 *       error sets the vertex state to failed and triggers the recovery logic.
+	 * 
+	 * @param scheduler
+	 * 
+	 * @throws IllegalStateException Thrown, if the vertex is not in CREATED state, which is the only state that permits scheduling.
+	 * @throws NoResourceAvailableException Thrown is no queued scheduling is allowed and no resources are currently available.
+	 */
+	public void scheduleForExecution(DefaultScheduler scheduler, boolean queued) throws NoResourceAvailableException {
+		if (scheduler == null) {
+			throw new NullPointerException();
+		}
+		
+		if (transitionState(CREATED, SCHEDULED)) {
+			
+			// record that we were scheduled
+			vertex.notifyStateTransition(attemptId, SCHEDULED, null);
+			
+			ScheduledUnit toSchedule = new ScheduledUnit(this, vertex.getJobVertex().getSlotSharingGroup());
+		
+			// IMPORTANT: To prevent leaks of cluster resources, we need to make sure that slots are returned
+			//     in all cases where the deployment failed. we use many try {} finally {} clauses to assure that
+			if (queued) {
+				SlotAllocationFuture future = scheduler.scheduleQueued(toSchedule);
+				
+				future.setFutureAction(new SlotAllocationFutureAction() {
+					@Override
+					public void slotAllocated(AllocatedSlot slot) {
+						try {
+							deployToSlot(slot);
+						}
+						catch (Throwable t) {
+							try {
+								slot.releaseSlot();
+							} finally {
+								markFailed(t);
+							}
+						}
+					}
+				});
+			}
+			else {
+				AllocatedSlot slot = scheduler.scheduleImmediately(toSchedule);
+				try {
+					deployToSlot(slot);
+				}
+				catch (Throwable t) {
+					try {
+						slot.releaseSlot();
+					} finally {
+						markFailed(t);
+					}
+				}
+			}
+		}
+		else if (this.state == CANCELED) {
+			// this can occur very rarely through heavy races. if the task was canceled, we do not
+			// schedule it
+			return;
+		}
+		else {
+			throw new IllegalStateException("The vertex must be in CREATED state to be scheduled.");
+		}
+	}
+	
+	public void deployToSlot(final AllocatedSlot slot) throws JobException {
+		// sanity checks
+		if (slot == null) {
+			throw new NullPointerException();
+		}
+		if (!slot.isAlive()) {
+			throw new JobException("Traget slot for deployment is not alive.");
+		}
+		
+		// make sure exactly one deployment call happens from the correct state
+		// note: the transition from CREATED to DEPLOYING is for testing purposes only
+		ExecutionState previous = this.state;
+		if (previous == SCHEDULED || previous == CREATED) {
+			if (!transitionState(previous, DEPLOYING)) {
+				// race condition, someone else beat us to the deploying call.
+				// this should actually not happen and indicates a race somewhere else
+				throw new IllegalStateException("Cannot deploy task: Concurrent deployment call race.");
+			}
+			
+			vertex.notifyStateTransition(attemptId, DEPLOYING, null);
+		}
+		else {
+			// vertex may have been cancelled, or it was already scheduled
+			throw new IllegalStateException("The vertex must be in CREATED or SCHEDULED state to be deployed. Found state " + previous);
+		}
+		
+		try {
+			// good, we are allowed to deploy
+			if (!slot.setExecutedVertex(this)) {
+				throw new JobException("Could not assign the ExecutionVertex to the slot " + slot);
+			}
+			this.assignedResource = slot;
+			
+			final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(attemptId, slot);
+			
+			// register this execution at the execution graph, to receive callbacks
+			vertex.getExecutionGraph().registerExecution(this);
+			
+			// we execute the actual deploy call in a concurrent action to prevent this call from blocking for long
+			Runnable deployaction = new Runnable() {
+	
+				@Override
+				public void run() {
+					try {
+						Instance instance = slot.getInstance();
+						instance.checkLibraryAvailability(vertex.getJobId());
+						
+						TaskOperationResult result = instance.getTaskManagerProxy().submitTask(deployment);
+						if (result == null) {
+							markFailed(new Exception("Failed to deploy the task to slot " + slot + ": TaskOperationResult was null"));
+						}
+						else if (!result.getExecutionId().equals(attemptId)) {
+							markFailed(new Exception("Answer execution id does not match the request execution id."));
+						}
+						else if (result.isSuccess()) {
+							switchToRunning();
+						}
+						else {
+							// deployment failed :(
+							markFailed(new Exception("Failed to deploy the task " + getVertexWithAttempt() + " to slot " + slot + ": " + result.getDescription()));
+						}
+					}
+					catch (Throwable t) {
+						// some error occurred. fail the task
+						markFailed(t);
+					}
+				}
+			};
+			
+			vertex.execute(deployaction);
+		}
+		catch (Throwable t) {
+			markFailed(t);
+			ExceptionUtils.rethrow(t);
+		}
+	}
+	
+	
+	public void cancel() {
+		// depending on the previous state, we go directly to cancelled (no cancel call necessary)
+		// -- or to canceling (cancel call needs to be sent to the task manager)
+		
+		// because of several possibly previous states, we need to again loop until we make a
+		// successful atomic state transition
+		while (true) {
+			
+			ExecutionState current = this.state;
+			
+			if (current == CANCELING || current == CANCELED) {
+				// already taken care of, no need to cancel again
+				return;
+			}
+				
+			// these two are the common cases where we need to send a cancel call
+			else if (current == RUNNING || current == DEPLOYING) {
+				// try to transition to canceling, if successful, send the cancel call
+				if (transitionState(current, CANCELING)) {
+					vertex.notifyStateTransition(attemptId, CANCELING, null);
+					sendCancelRpcCall();
+					return;
+				}
+				// else: fall through the loop
+			}
+			
+			else if (current == FINISHED || current == FAILED) {
+				// nothing to do any more. finished failed before it could be cancelled.
+				// in any case, the task is removed from the TaskManager already
+				return;
+			}
+			else if (current == CREATED || current == SCHEDULED) {
+				// from here, we can directly switch to cancelled, because the no task has been deployed
+				if (transitionState(current, CANCELED)) {
+					
+					// we skip the canceling state. set the timestamp, for a consistent appearance
+					markTimestamp(CANCELING, getStateTimestamp(CANCELED));
+					vertex.notifyStateTransition(attemptId, CANCELED, null);
+					return;
+				}
+				// else: fall through the loop
+			}
+			else {
+				throw new IllegalStateException(current.name());
+			}
+		}
+	}
+	
+	/**
+	 * This method fails the vertex due to an external condition. The task will move to state FAILED.
+	 * If the task was in state RUNNING or DEPLOYING before, it will send a cancel call to the TaskManager.
+	 * 
+	 * @param t The exception that caused the task to fail.
+	 */
+	public void fail(Throwable t) {
+		if (processFail(t, false)) {
+			if (LOG.isErrorEnabled()) {
+				LOG.error("Task " + getVertexWithAttempt() + " was failed.", t);
+			}
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//   Callbacks
+	// --------------------------------------------------------------------------------------------
+	
+	/**
+	 * This method marks the task as failed, but will make no attempt to remove task execution from the task manager.
+	 * It is intended for cases where the task is known not to be running, or then the TaskManager reports failure
+	 * (in which case it has already removed the task).
+	 * 
+	 * @param t The exception that caused the task to fail.
+	 */
+	void markFailed(Throwable t) {
+		// the call returns true if it actually made the state transition (was not already failed before, etc)
+		if (processFail(t, true)) {
+			if (LOG.isErrorEnabled()) {
+				LOG.error("Task " + getVertexWithAttempt() + " failed.", t);
+			}
+		}
+	}
+	
+	void markFinished() {
+		// this call usually comes during RUNNING, but may also come while still in deploying (very fast tasks!)
+		while (true) {
+			ExecutionState current = this.state;
+			
+			if (current == RUNNING || current == DEPLOYING) {
+			
+				if (transitionState(current, FINISHED)) {
+					try {
+						vertex.notifyStateTransition(attemptId, FINISHED, null);
+						vertex.executionFinished();
+						return;
+					}
+					finally {
+						vertex.getExecutionGraph().deregisterExecution(this);
+						assignedResource.releaseSlot();
+					}
+				}
+			}
+			else {
+				if (current == CANCELED || current == CANCELING || current == FAILED) {
+					if (LOG.isDebugEnabled()) {
+						LOG.debug("Task FINISHED, but concurrently went to state " + state);
+					}
+					return;
+				}
+				else {
+					// this should not happen, we need to fail this
+					markFailed(new Exception("Vertex received FINISHED message while being in state " + state));
+					return;
+				}
+			}
+		}
+	}
+	
+	void cancelingComplete() {
+		if (transitionState(CANCELING, CANCELED)) {
+			try {
+				vertex.executionCanceled();
+				vertex.notifyStateTransition(attemptId, CANCELED, null);
+			}
+			finally {
+				vertex.getExecutionGraph().deregisterExecution(this);
+				assignedResource.releaseSlot();
+			}
+		}
+		else {
+			ExecutionState actualState = this.state;
+			// failing in the meantime may happen and is no problem.
+			// anything else is a serious problem !!!
+			if (actualState != FAILED) {
+				String message = String.format("Asynchronous race: Found state %s after successful cancel call.", state);
+				LOG.error(message);
+				vertex.getExecutionGraph().fail(new Exception(message));
+			}
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  Internal Actions
+	// --------------------------------------------------------------------------------------------
+	
+	private boolean processFail(Throwable t, boolean isCallback) {
+		
+		// damn, we failed. This means only that we keep our books and notify our parent JobExecutionVertex
+		// the actual computation on the task manager is cleaned up by the TaskManager that noticed the failure
+		
+		// we may need to loop multiple times (in the presence of concurrent calls) in order to
+		// atomically switch to failed 
+		while (true) {
+			ExecutionState current = this.state;
+			
+			if (current == FAILED) {
+				// already failed. It is enough to remember once that we failed (its sad enough)
+				return false;
+			}
+			
+			if (current == CANCELED || (current == CANCELING && isCallback)) {
+				// we are already aborting or are already aborted
+				if (LOG.isDebugEnabled()) {
+					LOG.debug(String.format("Ignoring transition of vertex %s to %s while being %s", 
+							getVertexWithAttempt(), FAILED, current));
+				}
+				return false;
+			}
+			
+			if (transitionState(current, FAILED)) {
+				// success (in a manner of speaking)
+				
+				if (!isCallback && (current == RUNNING || current == DEPLOYING)) {
+					if (LOG.isDebugEnabled()) {
+						LOG.debug("Sending out cancel request, to remove task execution from TaskManager.");
+					}
+					
+					try {
+						if (assignedResource != null) {
+							sendCancelRpcCall();
+						}
+					} catch (Throwable tt) {
+						// no reason this should ever happen, but log it to be safe
+						LOG.error("Error triggering cancel call while marking task as failed.", tt);
+					}
+				}
+				
+				try {
+					this.failureCause = t;
+					vertex.executionFailed(t);
+					vertex.notifyStateTransition(attemptId, FAILED, t);
+				}
+				finally {
+					if (assignedResource != null) {
+						assignedResource.releaseSlot();
+					}
+					vertex.getExecutionGraph().deregisterExecution(this);
+				}
+				
+				// leave the loop
+				return true;
+			}
+		}
+	}
+	
+	private void switchToRunning() {
+		
+		// transition state, the common case
+		if (transitionState(DEPLOYING, RUNNING)) {
+			vertex.notifyStateTransition(attemptId, RUNNING, null);
+		}
+		else {
+			// something happened while the call was in progress.
+			// it can mean:
+			//  - canceling, while deployment was in progress. state is now canceling, or canceled, if the response overtook
+			//  - finishing (execution and finished call overtook the deployment answer, which is possible and happens for fast tasks)
+			//  - failed (execution, failure, and failure message overtook the deployment answer)
+			
+			ExecutionState currentState = this.state;
+			
+			if (currentState == FINISHED || currentState == CANCELED) {
+				// do nothing, this is nice, the task was really fast
+			}
+			
+			if (currentState == CANCELING || currentState == FAILED) {
+				if (LOG.isDebugEnabled()) {
+					LOG.debug(String.format("Concurrent canceling/failing of %s while deployment was in progress.", getVertexWithAttempt()));
+				}
+				sendCancelRpcCall();
+			}
+			else {
+				String message = String.format("Concurrent unexpected state transition of task %s to %s while deployment was in progress.",
+						getVertexWithAttempt(), currentState);
+				
+				if (LOG.isDebugEnabled()) {
+					LOG.debug(message);
+				}
+				
+				// undo the deployment
+				sendCancelRpcCall();
+				
+				// record the failure
+				markFailed(new Exception(message));
+			}
+		}
+	}
+	
+	private void sendCancelRpcCall() {
+		final AllocatedSlot slot = this.assignedResource;
+		if (slot == null) {
+			throw new IllegalStateException("Cannot cancel when task was not running or deployed.");
+		}
+		
+		Runnable cancelAction = new Runnable() {
+			
+			@Override
+			public void run() {
+				Throwable exception = null;
+				
+				for (int triesLeft = NUM_CANCEL_CALL_TRIES; triesLeft > 0; --triesLeft) {
+					
+					try {
+						// send the call. it may be that the task is not really there (asynchronous / overtaking messages)
+						// in which case it is fine (the deployer catches it)
+						TaskOperationResult result = slot.getInstance().getTaskManagerProxy().cancelTask(attemptId);
+						
+						if (!result.isSuccess()) {
+							// the task was not found, which may be when the task concurrently finishes or fails, or
+							// when the cancel call overtakes the deployment call
+							if (LOG.isDebugEnabled()) {
+								LOG.debug("Cancel task call did not find task. Probably RPC call race.");
+							}
+						}
+						
+						// in any case, we need not call multiple times, so we quit
+						return;
+					}
+					catch (Throwable t) {
+						if (exception == null) {
+							exception = t;
+						}
+						LOG.error("Canceling vertex " + getVertexWithAttempt() + " failed (" + triesLeft + " tries left): " + t.getMessage() , t);
+					}
+				}
+				
+				// dang, utterly unsuccessful - the target node must be down, in which case the tasks are lost anyways
+				fail(new Exception("Task could not be canceled.", exception));
+			}
+		};
+		
+		vertex.execute(cancelAction);
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  Miscellaneous
+	// --------------------------------------------------------------------------------------------
+	
+	private boolean transitionState(ExecutionState currentState, ExecutionState targetState) {
+		if (STATE_UPDATER.compareAndSet(this, currentState, targetState)) {
+			markTimestamp(targetState);
+			return true;
+		} else {
+			return false;
+		}
+	}
+	
+	private void markTimestamp(ExecutionState state) {
+		markTimestamp(state, System.currentTimeMillis());
+	}
+	
+	private void markTimestamp(ExecutionState state, long timestamp) {
+		this.stateTimestamps[state.ordinal()] = timestamp;
+	}
+	
+	public String getVertexWithAttempt() {
+		return vertex.getSimpleName() + " - execution #" + attemptNumber;
+	}
+	
+	@Override
+	public String toString() {
+		return String.format("Attempt #%d (%s) @ %s - [%s]", attemptNumber, vertex.getSimpleName(),
+				(assignedResource == null ? "(unassigned)" : assignedResource.toString()), state);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAttempt.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAttempt.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAttempt.java
deleted file mode 100644
index b623d6f..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAttempt.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.flink.runtime.instance.AllocatedSlot;
-
-public class ExecutionAttempt {
-
-	private final AtomicBoolean finished = new AtomicBoolean();
-	
-	private final ExecutionAttemptID attemptId;
-	
-	private final AllocatedSlot assignedResource;
-	
-	private final int attemptNumber;
-	
-	private final long startTimestamp;
-	
-	private volatile long endTimestamp;
-	
-	private volatile Throwable failureCause;
-	
-	// --------------------------------------------------------------------------------------------
-	
-	public ExecutionAttemptID getAttemptId() {
-		return attemptId;
-	}
-	
-	public ExecutionAttempt(ExecutionAttemptID attemptId, AllocatedSlot assignedResource, int attemptNumber, long startTimestamp) {
-		this.attemptId = attemptId;
-		this.assignedResource = assignedResource;
-		this.attemptNumber = attemptNumber;
-		this.startTimestamp = startTimestamp;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-
-	public AllocatedSlot getAssignedResource() {
-		return assignedResource;
-	}
-	
-	public int getAttemptNumber() {
-		return attemptNumber;
-	}
-	
-	public long getStartTimestamp() {
-		return startTimestamp;
-	}
-	
-	public long getEndTimestamp() {
-		return endTimestamp;
-	}
-	
-	public Throwable getFailureCause() {
-		return failureCause;
-	}
-	
-	public boolean isFinished() {
-		return finished.get();
-	}
-	
-	public boolean isFailed() {
-		return finished.get() && failureCause != null;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	public boolean finish() {
-		if (finished.compareAndSet(false, true)) {
-			endTimestamp = System.currentTimeMillis();
-			return true;
-		} else {
-			return false;
-		}
-	}
-	
-	public boolean fail(Throwable error) {
-		if (finished.compareAndSet(false, true)) {
-			failureCause = error;
-			return true;
-		} else {
-			return false;
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public String toString() {
-		return String.format("Attempt #%d (%s) @ %s - started %d %s", attemptNumber, attemptId,
-				assignedResource.toString(), startTimestamp, isFinished() ? "finished " + endTimestamp : "[RUNNING]");
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionEdge.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionEdge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionEdge.java
new file mode 100644
index 0000000..f001b6f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionEdge.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.io.network.channels.ChannelID;
+
+public class ExecutionEdge {
+
+	private final IntermediateResultPartition source;
+	
+	private final ExecutionVertex target;
+	
+	private final int inputNum;
+
+	private final ChannelID inputChannelId;
+	
+	private final ChannelID outputChannelId;
+	
+	
+	public ExecutionEdge(IntermediateResultPartition source, ExecutionVertex target, int inputNum) {
+		this.source = source;
+		this.target = target;
+		this.inputNum = inputNum;
+		
+		this.inputChannelId = new ChannelID();
+		this.outputChannelId = new ChannelID();
+	}
+	
+	public ExecutionEdge(IntermediateResultPartition source, ExecutionVertex target, int inputNum, ChannelID inputChannelId, ChannelID outputChannelId) {
+		this.source = source;
+		this.target = target;
+		this.inputNum = inputNum;
+		
+		this.inputChannelId = inputChannelId;
+		this.outputChannelId = outputChannelId;
+	}
+	
+	
+	public IntermediateResultPartition getSource() {
+		return source;
+	}
+	
+	public ExecutionVertex getTarget() {
+		return target;
+	}
+	
+	public int getInputNum() {
+		return inputNum;
+	}
+	
+	public ChannelID getInputChannelId() {
+		return inputChannelId;
+	}
+	
+	public ChannelID getOutputChannelId() {
+		return outputChannelId;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionEdge2.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionEdge2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionEdge2.java
deleted file mode 100644
index a7cbeaf..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionEdge2.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph;
-
-import org.apache.flink.runtime.io.network.channels.ChannelID;
-
-public class ExecutionEdge2 {
-
-	private final IntermediateResultPartition source;
-	
-	private final ExecutionVertex2 target;
-	
-	private final int inputNum;
-
-	private final ChannelID inputChannelId;
-	
-	private final ChannelID outputChannelId;
-	
-	
-	public ExecutionEdge2(IntermediateResultPartition source, ExecutionVertex2 target, int inputNum) {
-		this.source = source;
-		this.target = target;
-		this.inputNum = inputNum;
-		
-		this.inputChannelId = new ChannelID();
-		this.outputChannelId = new ChannelID();
-	}
-	
-	public ExecutionEdge2(IntermediateResultPartition source, ExecutionVertex2 target, int inputNum, ChannelID inputChannelId, ChannelID outputChannelId) {
-		this.source = source;
-		this.target = target;
-		this.inputNum = inputNum;
-		
-		this.inputChannelId = inputChannelId;
-		this.outputChannelId = outputChannelId;
-	}
-	
-	
-	public IntermediateResultPartition getSource() {
-		return source;
-	}
-	
-	public ExecutionVertex2 getTarget() {
-		return target;
-	}
-	
-	public int getInputNum() {
-		return inputNum;
-	}
-	
-	public ChannelID getInputChannelId() {
-		return inputChannelId;
-	}
-	
-	public ChannelID getOutputChannelId() {
-		return outputChannelId;
-	}
-}


Mime
View raw message