flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [2/7] flink git commit: [FLINK-1415] [runtime] Clean up archiving of ExecutionGraphs
Date Thu, 05 Feb 2015 12:25:43 GMT
[FLINK-1415] [runtime] Clean up archiving of ExecutionGraphs


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8ae0dc2d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8ae0dc2d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8ae0dc2d

Branch: refs/heads/master
Commit: 8ae0dc2d768aecfa3129df553f43d827792b65d7
Parents: db1b8b9
Author: Stephan Ewen <sewen@apache.org>
Authored: Wed Feb 4 13:40:13 2015 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Thu Feb 5 12:17:15 2015 +0100

----------------------------------------------------------------------
 .../flink/runtime/executiongraph/Execution.java | 35 ++++++++---
 .../runtime/executiongraph/ExecutionGraph.java  | 51 ++++++++++++----
 .../executiongraph/ExecutionJobVertex.java      | 41 +++++++++++--
 .../runtime/executiongraph/ExecutionVertex.java | 50 ++++++++++++----
 .../apache/flink/runtime/instance/Instance.java | 62 ++++++++++----------
 .../flink/runtime/instance/SharedSlot.java      |  4 +-
 .../flink/runtime/instance/SimpleSlot.java      |  6 +-
 .../org/apache/flink/runtime/instance/Slot.java | 32 ++++++----
 .../scheduler/CoLocationConstraint.java         |  6 +-
 .../scheduler/NoResourceAvailableException.java |  9 ++-
 .../runtime/jobmanager/scheduler/Scheduler.java |  1 -
 .../scheduler/SlotAvailabilityListener.java     |  4 +-
 .../jobmanager/web/JobManagerInfoServlet.java   | 27 ++++-----
 .../runtime/jobmanager/web/JsonFactory.java     |  7 +--
 .../runtime/jobmanager/web/WebInfoServer.java   |  6 --
 .../flink/runtime/jobmanager/JobManager.scala   |  5 +-
 .../runtime/jobmanager/MemoryArchivist.scala    | 14 ++---
 .../runtime/messages/ArchiveMessages.scala      |  9 +++
 .../runtime/messages/JobmanagerMessages.scala   | 31 ++++++++++
 .../test/javaApiOperators/DataSinkITCase.java   |  2 +-
 20 files changed, 275 insertions(+), 127 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index e1a24c4..27977c3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -22,12 +22,14 @@ import akka.actor.ActorRef;
 import akka.dispatch.OnComplete;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
+
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.deployment.PartitionInfo;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.AllocatedSlot;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
@@ -40,6 +42,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
 import org.apache.flink.util.ExceptionUtils;
 import org.slf4j.Logger;
+
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -78,7 +81,7 @@ import static org.apache.flink.runtime.execution.ExecutionState.SCHEDULED;
  */
 public class Execution implements Serializable {
 
-	static final long serialVersionUID = 42L;
+	private static final long serialVersionUID = 42L;
 
 	private static final AtomicReferenceFieldUpdater<Execution, ExecutionState> STATE_UPDATER =
 			AtomicReferenceFieldUpdater.newUpdater(Execution.class, ExecutionState.class, "state");
@@ -97,22 +100,25 @@ public class Execution implements Serializable {
 
 	private final int attemptNumber;
 
-	public FiniteDuration timeout;
+	private final FiniteDuration timeout;
 
 
 	private volatile ExecutionState state = CREATED;
 	
-	private volatile SimpleSlot assignedResource;  // once assigned, never changes
+	private volatile SimpleSlot assignedResource;     // once assigned, never changes until the execution is archived
 	
 	private volatile Throwable failureCause;          // once assigned, never changes
 	
+	private volatile InstanceConnectionInfo assignedResourceLocation; // for the archived execution
+	
 	// --------------------------------------------------------------------------------------------
 	
-	public Execution(ExecutionVertex vertex, int attemptNumber, long startTimestamp,
-					FiniteDuration timeout) {
+	public Execution(ExecutionVertex vertex, int attemptNumber, long startTimestamp, FiniteDuration timeout) {
+		checkArgument(attemptNumber >= 0);
+		
 		this.vertex = checkNotNull(vertex);
 		this.attemptId = new ExecutionAttemptID();
-		checkArgument(attemptNumber >= 0);
+		
 		this.attemptNumber = attemptNumber;
 
 		this.stateTimestamps = new long[ExecutionState.values().length];
@@ -145,6 +151,10 @@ public class Execution implements Serializable {
 		return assignedResource;
 	}
 	
+	public InstanceConnectionInfo getAssignedResourceLocation() {
+		return assignedResourceLocation;
+	}
+	
 	public Throwable getFailureCause() {
 		return failureCause;
 	}
@@ -161,6 +171,16 @@ public class Execution implements Serializable {
 		return state == FINISHED || state == FAILED || state == CANCELED;
 	}
 	
+	/**
+	 * This method cleans fields that are irrelevant for the archived execution attempt.
+	 */
+	public void prepareForArchiving() {
+		if (assignedResource != null && assignedResource.isAlive()) {
+			throw new IllegalStateException("Cannot archive Execution while the assigned resource is still running.");
+		}
+		assignedResource = null;
+	}
+	
 	// --------------------------------------------------------------------------------------------
 	//  Actions
 	// --------------------------------------------------------------------------------------------
@@ -267,6 +287,7 @@ public class Execution implements Serializable {
 				throw new JobException("Could not assign the ExecutionVertex to the slot " + slot);
 			}
 			this.assignedResource = slot;
+			this.assignedResourceLocation = slot.getInstance().getInstanceConnectionInfo();
 
 			// race double check, did we fail/cancel and do we need to release the slot?
 			if (this.state != DEPLOYING) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 54c5d3a..3f857e5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.executiongraph;
 
 import akka.actor.ActorRef;
+
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -36,6 +37,7 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.util.ExceptionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.Serializable;
@@ -54,7 +56,7 @@ import static akka.dispatch.Futures.future;
 
 public class ExecutionGraph implements Serializable {
 
-	static final long serialVersionUID = 42L;
+	private static final long serialVersionUID = 42L;
 
 	private static final AtomicReferenceFieldUpdater<ExecutionGraph, JobStatus> STATE_UPDATER =
 			AtomicReferenceFieldUpdater.newUpdater(ExecutionGraph.class, JobStatus.class, "state");
@@ -71,10 +73,10 @@ public class ExecutionGraph implements Serializable {
 	private final String jobName;
 
 	/** The job configuration that was originally attached to the JobGraph. */
-	private transient final Configuration jobConfiguration;
+	private final Configuration jobConfiguration;
 
 	/** The classloader of the user code. */
-	private final ClassLoader userClassLoader;
+	private ClassLoader userClassLoader;
 
 	/** All job vertices that are part of this graph */
 	private final ConcurrentHashMap<JobVertexID, ExecutionJobVertex> tasks;
@@ -83,20 +85,20 @@ public class ExecutionGraph implements Serializable {
 	private final List<ExecutionJobVertex> verticesInCreationOrder;
 
 	/** All intermediate results that are part of this graph */
-	private transient final ConcurrentHashMap<IntermediateDataSetID, IntermediateResult> intermediateResults;
+	private final ConcurrentHashMap<IntermediateDataSetID, IntermediateResult> intermediateResults;
 
 	/** The currently executed tasks, for callbacks */
-	private transient final ConcurrentHashMap<ExecutionAttemptID, Execution> currentExecutions;
+	private final ConcurrentHashMap<ExecutionAttemptID, Execution> currentExecutions;
 
-	private transient final List<BlobKey> requiredJarFiles;
+	private final List<BlobKey> requiredJarFiles;
 
-	private transient final List<ActorRef> jobStatusListenerActors;
+	private final List<ActorRef> jobStatusListenerActors;
 
-	private transient final List<ActorRef> executionListenerActors;
+	private final List<ActorRef> executionListenerActors;
 
 	private final long[] stateTimestamps;
 
-	private transient final Object progressLock = new Object();
+	private final Object progressLock = new Object();
 
 	private int nextVertexToFinish;
 
@@ -110,12 +112,13 @@ public class ExecutionGraph implements Serializable {
 
 	private volatile Throwable failureCause;
 
-	private transient Scheduler scheduler;
+	private Scheduler scheduler;
 
 	private boolean allowQueuedScheduling = true;
 
 	private ScheduleMode scheduleMode = ScheduleMode.FROM_SOURCES;
 
+	
 	public ExecutionGraph(JobID jobId, String jobName, Configuration jobConfig, FiniteDuration timeout) {
 		this(jobId, jobName, jobConfig, timeout, new ArrayList<BlobKey>());
 	}
@@ -598,8 +601,11 @@ public class ExecutionGraph implements Serializable {
 	public void restart() {
 		try {
 			if (state == JobStatus.FAILED) {
-				transitionState(JobStatus.FAILED, JobStatus.RESTARTING);
+				if (!transitionState(JobStatus.FAILED, JobStatus.RESTARTING)) {
+					throw new IllegalStateException("Execution Graph left the state FAILED while trying to restart.");
+				}
 			}
+			
 			synchronized (progressLock) {
 				if (state != JobStatus.RESTARTING) {
 					throw new IllegalStateException("Can only restart job from state restarting.");
@@ -627,4 +633,27 @@ public class ExecutionGraph implements Serializable {
 			fail(t);
 		}
 	}
+	
+	/**
+	 * This method cleans fields that are irrelevant for the archived execution attempt.
+	 */
+	public void prepareForArchiving() {
+		if (!state.isTerminalState()) {
+			throw new IllegalStateException("Can only archive the job from a terminal state");
+		}
+		
+		userClassLoader = null;
+		
+		for (ExecutionJobVertex vertex : verticesInCreationOrder) {
+			vertex.prepareForArchiving();
+		}
+		
+		intermediateResults.clear();
+		currentExecutions.clear();
+		requiredJarFiles.clear();
+		jobStatusListenerActors.clear();
+		executionListenerActors.clear();
+		
+		scheduler = null;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 7eaa1e6..e2febc7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -42,12 +42,13 @@ import scala.concurrent.duration.FiniteDuration;
 
 
 public class ExecutionJobVertex implements Serializable {
-	static final long serialVersionUID = 42L;
+	
+	private static final long serialVersionUID = 42L;
 	
 	/** Use the same log for all ExecutionGraph classes */
 	private static final Logger LOG = ExecutionGraph.LOG;
 	
-	private transient final Object stateMonitor = new Object();
+	private final Object stateMonitor = new Object();
 	
 	private final ExecutionGraph graph;
 	
@@ -55,9 +56,9 @@ public class ExecutionJobVertex implements Serializable {
 	
 	private final ExecutionVertex[] taskVertices;
 
-	private transient final IntermediateResult[] producedDataSets;
+	private IntermediateResult[] producedDataSets;
 	
-	private transient final List<IntermediateResult> inputs;
+	private final List<IntermediateResult> inputs;
 	
 	private final int parallelism;
 	
@@ -71,7 +72,7 @@ public class ExecutionJobVertex implements Serializable {
 	
 	private final InputSplit[] inputSplits;
 	
-	private transient InputSplitAssigner splitAssigner;
+	private InputSplitAssigner splitAssigner;
 	
 	
 	public ExecutionJobVertex(ExecutionGraph graph, AbstractJobVertex jobVertex,
@@ -308,6 +309,36 @@ public class ExecutionJobVertex implements Serializable {
 		}
 	}
 	
+	/**
+	 * This method cleans fields that are irrelevant for the archived execution attempt.
+	 */
+	public void prepareForArchiving() {
+		
+		for (ExecutionVertex vertex : taskVertices) {
+			vertex.prepareForArchiving();
+		}
+		
+		// clear intermediate results
+		inputs.clear();
+		producedDataSets = null;
+		
+		// reset shared groups
+		if (slotSharingGroup != null) {
+			slotSharingGroup.clearTaskAssignment();
+		}
+		if (coLocationGroup != null) {
+			coLocationGroup.resetConstraints();
+		}
+		
+		// reset splits and split assigner
+		splitAssigner = null;
+		if (inputSplits != null) {
+			for (int i = 0; i < inputSplits.length; i++) {
+				inputSplits[i] = null;
+			}
+		}
+	}
+	
 	//---------------------------------------------------------------------------------------------
 	//  Notifications
 	//---------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index d3993bb..1092f89 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.executiongraph;
 
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.blob.BlobKey;
@@ -37,6 +38,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.slf4j.Logger;
+
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.Serializable;
@@ -56,9 +58,8 @@ import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
  */
 public class ExecutionVertex implements Serializable {
 
-	static final long serialVersionUID = 42L;
+	private static final long serialVersionUID = 42L;
 
-	@SuppressWarnings("unused")
 	private static final Logger LOG = ExecutionGraph.LOG;
 	
 	private static final int MAX_DISTINCT_LOCATIONS_TO_CONSIDER = 8;
@@ -67,9 +68,9 @@ public class ExecutionVertex implements Serializable {
 	
 	private final ExecutionJobVertex jobVertex;
 	
-	private transient final IntermediateResultPartition[] resultPartitions;
+	private IntermediateResultPartition[] resultPartitions;
 	
-	private transient ExecutionEdge[][] inputEdges;
+	private ExecutionEdge[][] inputEdges;
 	
 	private final int subTaskIndex;
 	
@@ -182,6 +183,10 @@ public class ExecutionVertex implements Serializable {
 		return currentExecution.getAssignedResource();
 	}
 	
+	public InstanceConnectionInfo getCurrentAssignedResourceLocation() {
+		return currentExecution.getAssignedResourceLocation();
+	}
+	
 	public ExecutionGraph getExecutionGraph() {
 		return this.jobVertex.getGraph();
 	}
@@ -322,6 +327,11 @@ public class ExecutionVertex implements Serializable {
 	// --------------------------------------------------------------------------------------------
 
 	public void resetForNewExecution() {
+		
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Resetting exection vertex {} for new execution.", getSimpleName());
+		}
+		
 		synchronized (priorExecutions) {
 			Execution execution = currentExecution;
 			ExecutionState state = execution.getState();
@@ -369,6 +379,31 @@ public class ExecutionVertex implements Serializable {
 
 		return currentExecution.scheduleOrUpdateConsumers(partition.getConsumers());
 	}
+	
+	/**
+	 * This method cleans fields that are irrelevant for the archived execution attempt.
+	 */
+	public void prepareForArchiving() {
+		Execution execution = currentExecution;
+		ExecutionState state = execution.getState();
+
+		// sanity check
+		if (!(state == FINISHED || state == CANCELED || state == FAILED)) {
+			throw new IllegalStateException("Cannot archive ExecutionVertex that is not in a finished state.");
+		}
+		
+		// prepare the current execution for archiving
+		execution.prepareForArchiving();
+		
+		// prepare previous executions for archiving
+		for (Execution exec : priorExecutions) {
+			exec.prepareForArchiving();
+		}
+		
+		// clear the unnecessary fields in this class
+		this.resultPartitions = null;
+		this.inputEdges = null;
+	}
 
 	// --------------------------------------------------------------------------------------------
 	//   Notifications from the Execution Attempt
@@ -447,13 +482,6 @@ public class ExecutionVertex implements Serializable {
 		return getTaskName() + " (" + (getParallelSubtaskIndex()+1) + '/' + getTotalNumberOfParallelSubtasks() + ')';
 	}
 
-	/*
-	 * Clears all Edges of this ExecutionVertex
-	 */
-	public void clearExecutionEdges() {
-		inputEdges = null;
-	}
-
 	@Override
 	public String toString() {
 		return getSimpleName();

http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
index 4f9dc7f..a5a9263 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.instance;
 
-import java.io.Serializable;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -33,17 +32,16 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotAvailabilityListener;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroupAssignment;
 
 /**
- * An taskManager represents a resource a {@link org.apache.flink.runtime.taskmanager.TaskManager} runs on.
+ * An instance represents a {@link org.apache.flink.runtime.taskmanager.TaskManager}
+ * registered at a JobManager and ready to receive work.
  */
-public class Instance implements Serializable {
-
-	static final long serialVersionUID = 42L;
+public class Instance {
 	
 	/** The lock on which to synchronize allocations and failure state changes */
-	private transient final Object instanceLock = new Object();
+	private final Object instanceLock = new Object();
 	
 	/** The actor ref to the task manager represented by this taskManager. */
-	private transient final ActorRef taskManager;
+	private final ActorRef taskManager;
 
 	/** The instance connection information for the data transfer. */
 	private final InstanceConnectionInfo connectionInfo;
@@ -58,28 +56,27 @@ public class Instance implements Serializable {
 	private final int numberOfSlots;
 
 	/** A list of available slot positions */
-	private transient final Queue<Integer> availableSlots;
+	private final Queue<Integer> availableSlots;
 	
 	/** Allocated slots on this taskManager */
 	private final Set<Slot> allocatedSlots = new HashSet<Slot>();
 
-	
 	/** A listener to be notified upon new slot availability */
-	private transient SlotAvailabilityListener slotAvailabilityListener;
+	private SlotAvailabilityListener slotAvailabilityListener;
 	
-	/**
-	 * Time when last heat beat has been received from the task manager running on this taskManager.
-	 */
+	/** Time when last heat beat has been received from the task manager running on this taskManager. */
 	private volatile long lastReceivedHeartBeat = System.currentTimeMillis();
 	
+	/** Flag marking the instance as alive or as dead. */
 	private volatile boolean isDead;
 
 	// --------------------------------------------------------------------------------------------
 	
 	/**
-	 * Constructs an abstract taskManager object.
+	 * Constructs an instance reflecting a registered TaskManager.
 	 * 
 	 * @param taskManager The actor reference of the represented task manager.
+	 * @param connectionInfo The remote connection where the task manager receives requests.
 	 * @param id The id under which the taskManager is registered.
 	 * @param resources The resources available on the machine.
 	 * @param numberOfSlots The number of task slots offered by this taskManager.
@@ -123,15 +120,23 @@ public class Instance implements Serializable {
 	}
 
 	public void markDead() {
+		
+		// create a copy of the slots to avoid concurrent modification exceptions
+		List<Slot> slots;
+		
 		synchronized (instanceLock) {
 			if (isDead) {
 				return;
 			}
-
 			isDead = true;
 
 			// no more notifications for the slot releasing
 			this.slotAvailabilityListener = null;
+			
+			slots = new ArrayList<Slot>(allocatedSlots);
+			
+			allocatedSlots.clear();
+			availableSlots.clear();
 		}
 
 		/*
@@ -139,14 +144,9 @@ public class Instance implements Serializable {
 		 * owning the assignment group lock wants to give itself back to the instance which requires
 		 * the instance lock
 		 */
-		for (Slot slot : allocatedSlots) {
+		for (Slot slot : slots) {
 			slot.releaseSlot();
 		}
-
-		synchronized (instanceLock) {
-			allocatedSlots.clear();
-			availableSlots.clear();
-		}
 	}
 
 
@@ -185,9 +185,9 @@ public class Instance implements Serializable {
 	// --------------------------------------------------------------------------------------------
 	// Resource allocation
 	// --------------------------------------------------------------------------------------------
-
+	
 	public SimpleSlot allocateSimpleSlot(JobID jobID) throws InstanceDiedException {
-		return allocateSimpleSlot(jobID, jobID);
+		return allocateSimpleSlot(jobID, new AbstractID());
 	}
 	
 	public SimpleSlot allocateSimpleSlot(JobID jobID, AbstractID groupID) throws InstanceDiedException {
@@ -211,8 +211,9 @@ public class Instance implements Serializable {
 		}
 	}
 
-	public SharedSlot allocateSharedSlot(JobID jobID, SlotSharingGroupAssignment sharingGroupAssignment, AbstractID groupID) throws
-	InstanceDiedException {
+	public SharedSlot allocateSharedSlot(JobID jobID, SlotSharingGroupAssignment sharingGroupAssignment, AbstractID groupID)
+			throws InstanceDiedException
+	{
 		// the slot needs to be in the returned to taskManager state
 		if (jobID == null) {
 			throw new IllegalArgumentException();
@@ -227,8 +228,7 @@ public class Instance implements Serializable {
 			if (nextSlot == null) {
 				return null;
 			} else {
-				SharedSlot slot = new SharedSlot(jobID, this, nextSlot,
-						sharingGroupAssignment, null, groupID);
+				SharedSlot slot = new SharedSlot(jobID, this, nextSlot, sharingGroupAssignment, null, groupID);
 				allocatedSlots.add(slot);
 				return slot;
 			}
@@ -267,10 +267,10 @@ public class Instance implements Serializable {
 	}
 	
 	public void cancelAndReleaseAllSlots() {
-		List<Slot> copy = null;
-
+		
+		// we need to do this copy because of concurrent modification exceptions
+		List<Slot> copy;
 		synchronized (instanceLock) {
-			// we need to do this copy because of concurrent modification exceptions
 			copy = new ArrayList<Slot>(this.allocatedSlots);
 		}
 			
@@ -329,7 +329,7 @@ public class Instance implements Serializable {
 	
 	@Override
 	public String toString() {
-		return instanceId + " @" + (taskManager != null ? taskManager.path() : "ActorRef.noSender") + " " +
+		return instanceId + " @ " + (taskManager != null ? taskManager.path() : "ActorRef.noSender") + " - " +
 				numberOfSlots + " slots" + " - " + hashCode();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
index 2efcf6c..576db22 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
@@ -117,7 +117,7 @@ public class SharedSlot extends Slot {
 		if(isDead()){
 			return null;
 		} else {
-			SimpleSlot slot = new SimpleSlot(jobID, instance, subSlots.size(), this, jID);
+			SimpleSlot slot = new SimpleSlot(getJobID(), getInstance(), subSlots.size(), this, jID);
 			subSlots.add(slot);
 
 			return slot;
@@ -128,7 +128,7 @@ public class SharedSlot extends Slot {
 		if(isDead()){
 			return null;
 		} else {
-			SharedSlot slot = new SharedSlot(jobID, instance, subSlots.size(), assignmentGroup, this, jID);
+			SharedSlot slot = new SharedSlot(getJobID(), getInstance(), subSlots.size(), assignmentGroup, this, jID);
 			subSlots.add(slot);
 
 			return slot;

http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
index 5b1af57..1d42f1d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
@@ -71,7 +71,7 @@ public class SimpleSlot extends Slot {
 		}
 
 		// check that we can actually run in this slot
-		if (status != ALLOCATED_AND_ALIVE) {
+		if (getStatus() != ALLOCATED_AND_ALIVE) {
 			return false;
 		}
 
@@ -81,7 +81,7 @@ public class SimpleSlot extends Slot {
 		}
 
 		// we need to do a double check that we were not cancelled in the meantime
-		if (status != ALLOCATED_AND_ALIVE) {
+		if (getStatus() != ALLOCATED_AND_ALIVE) {
 			this.executedTask = null;
 			return false;
 		}
@@ -112,7 +112,7 @@ public class SimpleSlot extends Slot {
 				getParent().disposeChild(this);
 			} else {
 				// we have to give back the slot to the owning instance
-				instance.returnAllocatedSlot(this);
+				getInstance().returnAllocatedSlot(this);
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
index fb62c4c..2cf727c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
@@ -24,10 +24,12 @@ import org.apache.flink.runtime.jobgraph.JobID;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 /**
- * Base class for slots.
+ * Base class for task slots. TaskManagers offer one or more task slots, they define how many
+ * parallel tasks or task groups a TaskManager executes.
  */
 public abstract class Slot {
-	protected static final AtomicIntegerFieldUpdater<Slot> STATUS_UPDATER =
+	
+	private static final AtomicIntegerFieldUpdater<Slot> STATUS_UPDATER =
 			AtomicIntegerFieldUpdater.newUpdater(Slot.class, "status");
 
 	protected static final int ALLOCATED_AND_ALIVE = 0;		// tasks may be added and might be running
@@ -35,25 +37,27 @@ public abstract class Slot {
 	protected static final int RELEASED = 2;					// has been given back to the instance
 
 	/** The ID of the job this slice belongs to. */
-	protected final JobID jobID;
+	private final JobID jobID;
 
+	/** The id of the group that this slot is allocated to */
+	private final AbstractID groupID;
+	
 	/** The instance on which the slot is allocated */
-	protected final Instance instance;
+	private final Instance instance;
+	
+	/** The parent of this slot in the hierarchy, or null, if this is the parent */
+	private final SharedSlot parent;
 
 	/** The number of the slot on which the task is deployed */
-	protected final int slotNumber;
+	private final int slotNumber;
 
 	/** The state of the vertex, only atomically updated */
-	protected volatile int status = ALLOCATED_AND_ALIVE;
+	private volatile int status = ALLOCATED_AND_ALIVE;
 
 	/** Indicates whether this slot was marked dead by the system */
-	private boolean dead = false;
+	private volatile boolean dead = false;
 
-	private final AbstractID groupID;
-
-	private final SharedSlot parent;
-
-	private boolean disposed = false;
+	private volatile boolean disposed = false;
 
 
 	public Slot(JobID jobID, Instance instance, int slotNumber, SharedSlot parent, AbstractID groupID) {
@@ -102,6 +106,10 @@ public abstract class Slot {
 			return parent.getRoot();
 		}
 	}
+	
+	public int getStatus() {
+		return status;
+	}
 
 	public abstract int getNumberLeaves();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
index 8ef61b9..902b5a1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
@@ -24,11 +24,7 @@ import org.apache.flink.runtime.instance.Instance;
 import com.google.common.base.Preconditions;
 import org.apache.flink.runtime.instance.SharedSlot;
 
-import java.io.Serializable;
-
-public class CoLocationConstraint implements Serializable {
-
-	static final long serialVersionUID = 42L;
+public class CoLocationConstraint {
 	
 	private final CoLocationGroup group;
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
index 93b4541..2b86c43 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
@@ -54,7 +54,9 @@ public class NoResourceAvailableException extends JobException {
 	public NoResourceAvailableException(String message) {
 		super(message);
 	}
-
+	
+	// --------------------------------------------------------------------------------------------
+	
 	@Override
 	public boolean equals(Object obj){
 		if(obj == null){
@@ -67,4 +69,9 @@ public class NoResourceAvailableException extends JobException {
 			return getMessage().equals(((NoResourceAvailableException)obj).getMessage());
 		}
 	}
+	
+	@Override
+	public int hashCode() {
+		return getMessage().hashCode();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index c237aa5..1ad7a50 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -408,7 +408,6 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 	 * local instance. If no such instance exists (all slots occupied), then return null.
 	 *
 	 * @param requestedLocations
-	 * @return
 	 */
 	private Pair<Instance, Locality> findInstance(Iterable<Instance> requestedLocations){
 		if (this.instancesWithAvailableResources.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAvailabilityListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAvailabilityListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAvailabilityListener.java
index f75f294..0f02748 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAvailabilityListener.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAvailabilityListener.java
@@ -21,8 +21,8 @@ package org.apache.flink.runtime.jobmanager.scheduler;
 import org.apache.flink.runtime.instance.Instance;
 
 /**
- * A SlotAvailabilityListener can be notified when new {@link org.apache.flink.runtime.instance.AllocatedSlot2}s become available
- * on an {@link org.apache.flink.runtime.instance.Instance}.
+ * A SlotAvailabilityListener can be notified when new
+ * {@link org.apache.flink.runtime.instance.Slot}s become available on an {@link Instance}.
  */
 public interface SlotAvailabilityListener {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
index 1492ae1..6de3b39 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
@@ -34,16 +34,15 @@ import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
 import akka.actor.ActorRef;
+
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
 import org.apache.flink.runtime.messages.ArchiveMessages.ArchivedJobs;
-import org.apache.flink.runtime.messages.ArchiveMessages.RequestArchivedJobs$;
+import org.apache.flink.runtime.messages.ArchiveMessages;
+import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.JobManagerMessages.AccumulatorResultsResponse;
 import org.apache.flink.runtime.messages.JobManagerMessages.AccumulatorResultsFound;
 import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobs;
-import org.apache.flink.runtime.messages.JobManagerMessages.RequestRunningJobs$;
-import org.apache.flink.runtime.messages.JobManagerMessages.RequestTotalNumberOfSlots$;
-import org.apache.flink.runtime.messages.JobManagerMessages.RequestNumberRegisteredTaskManager$;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancellationResponse;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
 import org.apache.flink.runtime.messages.JobManagerMessages.RequestAccumulatorResults;
@@ -64,6 +63,7 @@ import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.StringUtils;
 import org.eclipse.jetty.io.EofException;
+
 import scala.concurrent.duration.FiniteDuration;
 
 public class JobManagerInfoServlet extends HttpServlet {
@@ -94,7 +94,7 @@ public class JobManagerInfoServlet extends HttpServlet {
 		try {
 			if("archive".equals(req.getParameter("get"))) {
 				List<ExecutionGraph> archivedJobs = new ArrayList<ExecutionGraph>(AkkaUtils
-						.<ArchivedJobs>ask(archive,RequestArchivedJobs$.MODULE$, timeout)
+						.<ArchivedJobs>ask(archive, ArchiveMessages.getRequestArchivedJobs(), timeout)
 						.asJavaCollection());
 
 				writeJsonForArchive(resp.getWriter(), archivedJobs);
@@ -129,9 +129,9 @@ public class JobManagerInfoServlet extends HttpServlet {
 			}
 			else if("taskmanagers".equals(req.getParameter("get"))) {
 				int numberOfTaskManagers = AkkaUtils.<Integer>ask(jobmanager,
-						RequestNumberRegisteredTaskManager$.MODULE$, timeout);
+						JobManagerMessages.getRequestNumberRegisteredTaskManager(), timeout);
 				int numberOfRegisteredSlots = AkkaUtils.<Integer>ask(jobmanager,
-						RequestTotalNumberOfSlots$.MODULE$, timeout);
+						JobManagerMessages.getRequestTotalNumberOfSlots(), timeout);
 
 				resp.getWriter().write("{\"taskmanagers\": " + numberOfTaskManagers +", " +
 						"\"slots\": "+numberOfRegisteredSlots+"}");
@@ -149,7 +149,7 @@ public class JobManagerInfoServlet extends HttpServlet {
 			}
 			else{
 				Iterable<ExecutionGraph> runningJobs = AkkaUtils.<RunningJobs>ask
-						(jobmanager, RequestRunningJobs$.MODULE$, timeout).asJavaIterable();
+						(jobmanager, JobManagerMessages.getRequestRunningJobs(), timeout).asJavaIterable();
 				writeJsonForJobs(resp.getWriter(), runningJobs);
 			}
 			
@@ -292,17 +292,16 @@ public class JobManagerInfoServlet extends HttpServlet {
 				boolean first = true;
 				for (ExecutionVertex vertex : graph.getAllExecutionVertices()) {
 					if (vertex.getExecutionState() == ExecutionState.FAILED) {
-						SimpleSlot slot = vertex.getCurrentAssignedResource();
+						InstanceConnectionInfo location = vertex.getCurrentAssignedResourceLocation();
 						Throwable failureCause = vertex.getFailureCause();
-						if (slot != null || failureCause != null) {
+						if (location != null || failureCause != null) {
 							if (first) {
 								first = false;
 							} else {
 								wrt.write(",");
 							}
 							wrt.write("{");
-							wrt.write("\"node\": \"" + (slot == null ? "(none)" : slot
-									.getInstance().getInstanceConnectionInfo().getFQDNHostname()) + "\",");
+							wrt.write("\"node\": \"" + (location == null ? "(none)" : location.getFQDNHostname()) + "\",");
 							wrt.write("\"message\": \"" + (failureCause == null ? "" : StringUtils.escapeHtml(ExceptionUtils.stringifyException(failureCause))) + "\"");
 							wrt.write("}");
 						}
@@ -421,7 +420,7 @@ public class JobManagerInfoServlet extends HttpServlet {
 		
 		try {
 			Iterable<ExecutionGraph> graphs = AkkaUtils.<RunningJobs>ask(jobmanager,
-					RequestRunningJobs$.MODULE$, timeout).asJavaIterable();
+					ArchiveMessages.getRequestArchivedJobs(), timeout).asJavaIterable();
 			
 			//Serialize job to json
 			wrt.write("{");

http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java
index 8e46692..89e55d0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java
@@ -22,7 +22,7 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.executiongraph.IntermediateResult;
-import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
 import org.apache.flink.util.StringUtils;
 
 import java.util.HashMap;
@@ -38,9 +38,8 @@ public class JsonFactory {
 		json.append("\"vertexname\": \"" + StringUtils.escapeHtml(vertex.getSimpleName()) + "\",");
 		json.append("\"vertexstatus\": \"" + vertex.getExecutionState() + "\",");
 		
-		SimpleSlot slot = vertex.getCurrentAssignedResource();
-		String instanceName = slot == null ? "(null)" : slot.getInstance()
-				.getInstanceConnectionInfo().getFQDNHostname();
+		InstanceConnectionInfo location = vertex.getCurrentAssignedResourceLocation();
+		String instanceName = location == null ? "(null)" : location.getFQDNHostname();
 		
 		json.append("\"vertexinstancename\": \"" + instanceName + "\"");
 		json.append("}");

http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
index 71347ee..6f3720e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
@@ -64,11 +64,6 @@ public class WebInfoServer {
 	private final Server server;
 
 	/**
-	 * Timeout for akka requests
-	 */
-	private final FiniteDuration timeout;
-
-	/**
 	 * Port for info server
 	 */
 	private int port;
@@ -92,7 +87,6 @@ public class WebInfoServer {
 		
 		this.port = config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY,
 				ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);
-		this.timeout = timeout;
 
 		// get base path of Flink installation
 		final String basePath = config.getString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, "");

http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 532f7f8..79b9a74 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -460,7 +460,10 @@ class JobManager(val configuration: Configuration)
    */
   private def removeJob(jobID: JobID): Unit = {
     currentJobs.remove(jobID) match {
-      case Some((eg, _)) => archive ! ArchiveExecutionGraph(jobID, eg)
+      case Some((eg, _)) => {
+        eg.prepareForArchiving()
+        archive ! ArchiveExecutionGraph(jobID, eg)
+      }
       case None =>
     }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
index 5ca8fb3..28e960d 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
@@ -42,13 +42,7 @@ ActorLogging {
       // wrap graph inside a soft reference
       graphs.update(jobID, new SoftReference(graph))
 
-      // clear all execution edges of the graph
-      val iter = graph.getAllExecutionVertices().iterator()
-      while (iter.hasNext) {
-        iter.next().clearExecutionEdges()
-      }
-
-      cleanup(jobID)
+      trimHistory()
     }
 
     case RequestArchivedJobs => {
@@ -89,17 +83,17 @@ ActorLogging {
       case Some(graph) => graph
       case None => null
     }
+    case None => null
   }
 
   /**
    * Remove old ExecutionGraphs belonging to a jobID
    * * if more than max_entries are in the queue.
-   * @param jobID
    */
-  private def cleanup(jobID: JobID): Unit = {
+  private def trimHistory(): Unit = {
     while (graphs.size > max_entries) {
       // get first graph inserted
-      val (jobID, value) = graphs.iterator.next()
+      val (jobID, value) = graphs.head
       graphs.remove(jobID)
     }
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
index 03a5351..884bc2a 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.jobgraph.JobID
  * This object contains the archive specific messages.
  */
 object ArchiveMessages {
+  
   case class ArchiveExecutionGraph(jobID: JobID, graph: ExecutionGraph)
 
   /**
@@ -47,4 +48,12 @@ object ArchiveMessages {
       jobs.asJavaCollection
     }
   }
+  
+  // --------------------------------------------------------------------------
+  // Utility methods to allow simpler case object access from Java
+  // --------------------------------------------------------------------------
+  
+  def getRequestArchivedJobs() : AnyRef = {
+    RequestArchivedJobs
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala
index fbacbd2..7ce013b 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala
@@ -326,4 +326,35 @@ object JobManagerMessages {
 
   case object JobManagerStatusAlive extends JobManagerStatus
 
+    // --------------------------------------------------------------------------
+  // Utility methods to allow simpler case object access from Java
+  // --------------------------------------------------------------------------
+  
+  def getRequestNumberRegisteredTaskManager() : AnyRef = {
+    RequestNumberRegisteredTaskManager
+  }
+  
+  def getRequestTotalNumberOfSlots() : AnyRef = {
+    RequestTotalNumberOfSlots
+  }
+  
+  def getRequestBlobManagerPort() : AnyRef = {
+    RequestBlobManagerPort
+  }
+  
+  def getRequestRunningJobs() : AnyRef = {
+    RequestRunningJobs
+  }
+  
+  def getRequestRegisteredTaskManagers() : AnyRef = {
+    RequestRegisteredTaskManagers
+  }
+  
+  def getRequestJobManagerStatus() : AnyRef = {
+    RequestJobManagerStatus
+  }
+  
+  def getJobManagerStatusAlive() : AnyRef = {
+    JobManagerStatusAlive
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSinkITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSinkITCase.java
index ca9dc16..d8663e8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSinkITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSinkITCase.java
@@ -41,7 +41,7 @@ import static org.junit.Assert.assertTrue;
 /**
  * Tests for data sinks
  */
-
+@SuppressWarnings("serial")
 @RunWith(Parameterized.class)
 public class DataSinkITCase extends MultipleProgramsTestBase {
 


Mime
View raw message