flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [11/63] [abbrv] Refactor job graph construction to incremental attachment based
Date Sun, 21 Sep 2014 02:12:35 GMT
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex2.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex2.java
index 09a6f5b..d703b4e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex2.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex2.java
@@ -1,59 +1,259 @@
-/***********************************************************************************************************************
- *
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- *
- **********************************************************************************************************************/
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 package org.apache.flink.runtime.executiongraph;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+import org.apache.commons.logging.Log;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.deployment.GateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState2;
+import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobEdge;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.DefaultScheduler;
+import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFutureAction;
+import org.apache.flink.runtime.taskmanager.TaskOperationResult;
+import org.apache.flink.util.StringUtils;
+
+import static org.apache.flink.runtime.execution.ExecutionState2.*;
 
+/**
+ * 
+ * NOTE ABOUT THE DESIGN RATIONAL:
+ * 
+ * In several points of the code, we need to deal with possible concurrent state changes and actions.
+ * For example, while the call to deploy a task (send it to the TaskManager) happens, the task gets cancelled.
+ * 
+ * We could lock the entire portion of the code (decision to deploy, deploy, set state to running) such that
+ * it is guaranteed that any "cancel command" will only pick up after deployment is done and that the "cancel
+ * command" call will never overtake the deploying call.
+ * 
+ * This blocks the threads big time, because the remote calls may take long. Depending of their locking behavior, it
+ * may even result in distributed deadlocks (unless carefully avoided). We therefore use atomic state updates and
+ * occasional double-checking to ensure that the state after a completed call is as expected, and trigger correcting
+ * actions if it is not. Many actions are also idempotent (like canceling).
+ */
 public class ExecutionVertex2 {
+	
+	private static final AtomicReferenceFieldUpdater<ExecutionVertex2, ExecutionState2> STATE_UPDATER =
+			AtomicReferenceFieldUpdater.newUpdater(ExecutionVertex2.class, ExecutionState2.class, "state");
+	
+	private static final AtomicReferenceFieldUpdater<ExecutionVertex2, AllocatedSlot> ASSIGNED_SLOT_UPDATER =
+			AtomicReferenceFieldUpdater.newUpdater(ExecutionVertex2.class, AllocatedSlot.class, "assignedSlot");
 
-	private final JobVertexID jobVertexId;
+	private static final Log LOG = ExecutionGraph.LOG;
 	
-			
-			
-	public ExecutionVertex2() {
-		this(new JobVertexID());
-	}
+	private static final int NUM_CANCEL_CALL_TRIES = 3;
+	
+	// --------------------------------------------------------------------------------------------
 	
-	public ExecutionVertex2(JobVertexID jobVertexId) {
-		this.jobVertexId = jobVertexId;
+	private final ExecutionJobVertex jobVertex;
+	
+	private final IntermediateResultPartition[] resultPartitions;
+	
+	private final ExecutionEdge2[][] inputEdges;
+	
+	private final int subTaskIndex;
+	
+	
+	private volatile ExecutionState2 state = CREATED;
+	
+	private volatile AllocatedSlot assignedSlot;
+	
+	private volatile Throwable failureCause;
+	
+	
+	public ExecutionVertex2(ExecutionJobVertex jobVertex, int subTaskIndex, IntermediateResult[] producedDataSets) {
+		this.jobVertex = jobVertex;
+		this.subTaskIndex = subTaskIndex;
+		
+		this.resultPartitions = new IntermediateResultPartition[producedDataSets.length];
+		for (int i = 0; i < producedDataSets.length; i++) {
+			IntermediateResultPartition irp = new IntermediateResultPartition(producedDataSets[i], this, subTaskIndex);
+			this.resultPartitions[i] = irp;
+			producedDataSets[i].setPartition(subTaskIndex, irp);
+		}
+		
+		this.inputEdges = new ExecutionEdge2[jobVertex.getJobVertex().getInputs().size()][];
 	}
 	
 	
+	// --------------------------------------------------------------------------------------------
+	//  Properties
+	// --------------------------------------------------------------------------------------------
 	
 	public JobID getJobId() {
-		return new JobID();
+		return this.jobVertex.getJobId();
 	}
 	
-	
 	public JobVertexID getJobvertexId() {
-		return this.jobVertexId;
+		return this.jobVertex.getJobVertexId();
 	}
 	
 	public String getTaskName() {
-		return "task";
+		return this.jobVertex.getJobVertex().getName();
 	}
 	
 	public int getTotalNumberOfParallelSubtasks() {
-		return 1;
+		return this.jobVertex.getParallelism();
 	}
 	
 	public int getParallelSubtaskIndex() {
-		return 0;
+		return this.subTaskIndex;
+	}
+	
+	public int getNumberOfInputs() {
+		return this.inputEdges.length;
+	}
+	
+	public ExecutionEdge2[] getInputEdges(int input) {
+		if (input < 0 || input >= this.inputEdges.length) {
+			throw new IllegalArgumentException(String.format("Input %d is out of range [0..%d)", input, this.inputEdges.length));
+		}
+		return inputEdges[input];
+	}
+	
+	public ExecutionState2 getExecutionState() {
+		return state;
+	}
+	
+	public Throwable getFailureCause() {
+		return failureCause;
+	}
+	
+	public AllocatedSlot getAssignedResource() {
+		return assignedSlot;
+	}
+	
+	private ExecutionGraph getExecutionGraph() {
+		return this.jobVertex.getGraph();
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  Graph building
+	// --------------------------------------------------------------------------------------------
+	
+	public void connectSource(int inputNumber, IntermediateResult source, JobEdge edge, int consumerNumber) {
+		
+		final DistributionPattern pattern = edge.getDistributionPattern();
+		final IntermediateResultPartition[] sourcePartitions = source.getPartitions();
+		
+		ExecutionEdge2[] edges = null;
+		
+		switch (pattern) {
+			case POINTWISE:
+				edges = connectPointwise(sourcePartitions, inputNumber);
+				break;
+				
+			case BIPARTITE: 
+				edges = connectAllToAll(sourcePartitions, inputNumber);
+				break;
+				
+			default:
+				throw new RuntimeException("Unrecognized distribution pattern.");
+		
+		}
+		
+		this.inputEdges[inputNumber] = edges;
+		
+		// add the cousumers to the source
+		for (ExecutionEdge2 ee : edges) {
+			ee.getSource().addConsumer(ee, consumerNumber);
+		}
+	}
+	
+	private ExecutionEdge2[] connectAllToAll(IntermediateResultPartition[] sourcePartitions, int inputNumber) {
+		ExecutionEdge2[] edges = new ExecutionEdge2[sourcePartitions.length];
+		
+		for (int i = 0; i < sourcePartitions.length; i++) {
+			IntermediateResultPartition irp = sourcePartitions[i];
+			edges[i] = new ExecutionEdge2(irp, this, inputNumber);
+		}
+		
+		return edges;
+	}
+	
+	private ExecutionEdge2[] connectPointwise(IntermediateResultPartition[] sourcePartitions, int inputNumber) {
+		final int numSources = sourcePartitions.length;
+		final int parallelism = getTotalNumberOfParallelSubtasks();
+		
+		// simple case same number of sources as targets
+		if (numSources == parallelism) {
+			return new ExecutionEdge2[] { new ExecutionEdge2(sourcePartitions[subTaskIndex], this, inputNumber) };
+		}
+		else if (numSources < parallelism) {
+			
+			int sourcePartition;
+			
+			// check if the pattern is regular or irregular
+			// we use int arithmetics for regular, and floating point with rounding for irregular
+			if (parallelism % numSources == 0) {
+				// same number of targets per source
+				int factor = parallelism / numSources;
+				sourcePartition = subTaskIndex / factor;
+			}
+			else {
+				// different number of targets per source
+				float factor = ((float) parallelism) / numSources;
+				sourcePartition = (int) (subTaskIndex / factor);
+			}
+			
+			return new ExecutionEdge2[] { new ExecutionEdge2(sourcePartitions[sourcePartition], this, inputNumber) };
+		}
+		else {
+			if (numSources % parallelism == 0) {
+				// same number of targets per source
+				int factor = numSources / parallelism;
+				int startIndex = subTaskIndex * factor;
+				
+				ExecutionEdge2[] edges = new ExecutionEdge2[factor];
+				for (int i = 0; i < factor; i++) {
+					edges[i] = new ExecutionEdge2(sourcePartitions[startIndex + i], this, inputNumber);
+				}
+				return edges;
+			}
+			else {
+				float factor = ((float) numSources) / parallelism;
+				
+				int start = (int) (subTaskIndex * factor);
+				int end = (subTaskIndex == getTotalNumberOfParallelSubtasks() - 1) ?
+						sourcePartitions.length : 
+						(int) ((subTaskIndex + 1) * factor);
+				
+				ExecutionEdge2[] edges = new ExecutionEdge2[end - start];
+				for (int i = 0; i < edges.length; i++) {
+					edges[i] = new ExecutionEdge2(sourcePartitions[start + i], this, inputNumber);
+				}
+				
+				return edges;
+			}
+		}
 	}
 
 	
@@ -61,15 +261,398 @@ public class ExecutionVertex2 {
 	//  Scheduling
 	// --------------------------------------------------------------------------------------------
 	
+	/**
+	 * NOTE: This method only throws exceptions if it is in an illegal state to be scheduled, or if the tasks needs
+	 *       to be scheduled immediately and no resource is available. If the task is accepted by the schedule, any
+	 *       error sets the vertex state to failed and triggers the recovery logic.
+	 * 
+	 * @param scheduler
+	 * 
+	 * @throws IllegalStateException Thrown, if the vertex is not in CREATED state, which is the only state that permits scheduling.
+	 * @throws NoResourceAvailableException Thrown is no queued scheduling is allowed and no resources are currently available.
+	 */
+	public void scheduleForExecution(DefaultScheduler scheduler) throws NoResourceAvailableException {
+		if (scheduler == null) {
+			throw new NullPointerException();
+		}
+		
+		if (STATE_UPDATER.compareAndSet(this, CREATED, SCHEDULED)) {
+			
+			getExecutionGraph().notifyExecutionChange(getJobvertexId(), subTaskIndex, SCHEDULED, null);
+			
+			ScheduledUnit toSchedule = new ScheduledUnit(this, jobVertex.getSlotSharingGroup());
+		
+			// IMPORTANT: To prevent leaks of cluster resources, we need to make sure that slots are returned
+			//     in all cases where the deployment failed. we use many try {} finally {} clauses to assure that
+			
+			boolean queued = jobVertex.getGraph().isQueuedSchedulingAllowed();
+			if (queued) {
+				SlotAllocationFuture future = scheduler.scheduleQueued(toSchedule);
+				
+				future.setFutureAction(new SlotAllocationFutureAction() {
+					@Override
+					public void slotAllocated(AllocatedSlot slot) {
+						try {
+							deployToSlot(slot);
+						}
+						catch (Throwable t) {
+							try {
+								slot.releaseSlot();
+							} finally {
+								fail(t);
+							}
+						}
+					}
+				});
+			}
+			else {
+				AllocatedSlot slot = scheduler.scheduleImmediately(toSchedule);
+				try {
+					deployToSlot(slot);
+				}
+				catch (Throwable t) {
+					try {
+						slot.releaseSlot();
+					} finally {
+						fail(t);
+					}
+				}
+			}
+		}
+		else if (this.state == CANCELED) {
+			// this can occur very rarely through heavy races. if the task was canceled, we do not
+			// schedule it
+			return;
+		}
+		else {
+			throw new IllegalStateException("The vertex must be in CREATED state to be scheduled.");
+		}
+	}
+	
+
+	public void deployToSlot(final AllocatedSlot slot) throws JobException {
+		// sanity checks
+		if (slot == null) {
+			throw new NullPointerException();
+		}
+		if (!slot.isAlive()) {
+			throw new IllegalArgumentException("Cannot deploy to a slot that is not alive.");
+		}
+		
+		// make sure exactly one deployment call happens from the correct state
+		// note: the transition from CREATED to DEPLOYING is for testing purposes only
+		ExecutionState2 previous = this.state;
+		if (previous == SCHEDULED || previous == CREATED) {
+			if (!STATE_UPDATER.compareAndSet(this, previous, DEPLOYING)) {
+				// race condition, someone else beat us to the deploying call.
+				// this should actually not happen and indicates a race somewhere else
+				throw new IllegalStateException("Cannot deploy task: Concurrent deployment call race.");
+			}
+			
+			getExecutionGraph().notifyExecutionChange(getJobvertexId(), subTaskIndex, DEPLOYING, null);
+		}
+		else {
+			// vertex may have been cancelled, or it was already scheduled
+			throw new IllegalStateException("The vertex must be in CREATED or SCHEDULED state to be deployed. Found state " + previous);
+		}
+		
+		// good, we are allowed to deploy
+		if (!slot.setExecutedVertex(this)) {
+			throw new JobException("Could not assign the ExecutionVertex to the slot " + slot);
+		}
+		setAssignedSlot(slot);
+		
+		
+		final TaskDeploymentDescriptor deployment = createDeploymentDescriptor();
+		
+		// we execute the actual deploy call in a concurrent action to prevent this call from blocking for long
+		Runnable deployaction = new Runnable() {
+
+			@Override
+			public void run() {
+				try {
+					Instance instance = slot.getInstance();
+					instance.checkLibraryAvailability(getJobId());
+					
+					TaskOperationResult result = instance.getTaskManagerProxy().submitTask(deployment);
+					if (result.isSuccess()) {
+						switchToRunning();
+					}
+					else {
+						// deployment failed :(
+						fail(new Exception("Failed to deploy the tast to slot " + slot + ": " + result.getDescription()));
+					}
+				}
+				catch (Throwable t) {
+					// some error occurred. fail the task
+					fail(t);
+				}
+			}
+		};
+		
+		execute(deployaction);
+	}
+	
+	private void switchToRunning() {
+		
+		// transition state
+		if (STATE_UPDATER.compareAndSet(ExecutionVertex2.this, DEPLOYING, RUNNING)) {
+			
+			getExecutionGraph().notifyExecutionChange(getJobvertexId(), subTaskIndex, RUNNING, null);
+			
+			this.jobVertex.vertexSwitchedToRunning(subTaskIndex);
+		}
+		else {
+			// something happened while the call was in progress.
+			// typically, that means canceling while deployment was in progress
+			
+			ExecutionState2 currentState = ExecutionVertex2.this.state;
+			
+			if (currentState == CANCELING) {
+				if (LOG.isDebugEnabled()) {
+					LOG.debug(String.format("Concurrent canceling of task %s while deployment was in progress.", ExecutionVertex2.this.toString()));
+				}
+				
+				sendCancelRpcCall();
+			} else {
+				if (LOG.isDebugEnabled()) {
+					LOG.debug(String.format("Concurrent unexpected state transition of task %s while deployment was in progress.", ExecutionVertex2.this.toString()));
+				}
+				
+				// undo the deployment
+				sendCancelRpcCall();
+				
+				// record the failure
+				fail(new Exception("Asynchronous state error. Execution Vertex switched to " + currentState + " while deployment was in progress."));
+			}
+		}
+	}
+	
+	public void cancel() {
+		// depending on the previous state, we go directly to cancelled (no cancel call necessary)
+		// -- or to canceling (cancel call needs to be sent to the task manager)
+		
+		// because of several possibly previous states, we need to again loop until we make a
+		// successful atomic state transition
+		while (true) {
+			
+			ExecutionState2 current = this.state;
+			
+			if (current == CANCELING || current == CANCELED) {
+				// already taken care of, no need to cancel again
+				return;
+			}
+				
+			// these two are the common cases where we need to send a cancel call
+			else if (current == RUNNING || current == DEPLOYING) {
+				// try to transition to canceling, if successful, send the cancel call
+				if (STATE_UPDATER.compareAndSet(this, current, CANCELING)) {
+					
+					getExecutionGraph().notifyExecutionChange(getJobvertexId(), subTaskIndex, CANCELING, null);
+					
+					sendCancelRpcCall();
+					return;
+				}
+				// else: fall through the loop
+			}
+			
+			else if (current == FINISHED || current == FAILED) {
+				// nothing to do any more. finished failed before it could be cancelled.
+				// in any case, the task is removed from the TaskManager already
+				return;
+			}
+			else if (current == CREATED || current == SCHEDULED) {
+				// from here, we can directly switch to cancelled, because the no task has been deployed
+				if (STATE_UPDATER.compareAndSet(this, current, CANCELED)) {
+					
+					getExecutionGraph().notifyExecutionChange(getJobvertexId(), subTaskIndex, CANCELED, null);
+					
+					return;
+				}
+				// else: fall through the loop
+			}
+			else {
+				throw new IllegalStateException(current.name());
+			}
+		}
+	}
+	
+	public void fail(Throwable t) {
+		
+		// damn, we failed. This means only that we keep our books and notify our parent JobExecutionVertex
+		// the actual computation on the task manager is cleaned up by the taskmanager that noticed the failure
+		
+		// we may need to loop multiple times (in the presence of concurrent calls) in order to
+		// atomically switch to failed 
+		while (true) {
+			ExecutionState2 current = this.state;
+			
+			if (current == FAILED) {
+				// concurrently set to failed. It is enough to remember once that we failed (its sad enough)
+				return;
+			}
+			
+			if (current == CANCELED) {
+				// we already aborting
+				if (LOG.isDebugEnabled()) {
+					LOG.debug(String.format("Ignoring transition of vertex %s to %s while being %s",
+							getSimpleName(), FAILED, current));
+				}
+				return;
+			}
+			
+			// we should be in DEPLOYING or RUNNING when a regular failure happens
+			if (current != DEPLOYING && current != RUNNING && current != CANCELING) {
+				// this should not happen. still, what else to do but to comply and go to the FAILED state
+				// at least we should complain loudly to the log
+				LOG.error(String.format("Vertex %s unexpectedly went from state %s to %s with error: %s",
+						getSimpleName(), CREATED, FAILED, t.getMessage()), t);
+			}
+			
+			if (STATE_UPDATER.compareAndSet(this, current, FAILED)) {
+				// success (in a manner of speaking)
+				this.failureCause = t;
+				
+				getExecutionGraph().notifyExecutionChange(getJobvertexId(), subTaskIndex, FAILED, StringUtils.stringifyException(t));
+				
+				// release the slot (concurrency safe)
+				setAssignedSlot(null);
+				
+				this.jobVertex.vertexFailed(subTaskIndex);
+				
+				// leave the loop
+				return;
+			}
+		}
+	}
+	
+	private void sendCancelRpcCall() {
+		// first of all, copy a reference to the stack. any concurrent change to the
+		// field does not affect us now
+		final AllocatedSlot slot = this.assignedSlot;
+		if (slot == null) {
+			throw new IllegalStateException("Cannot cancel when task was not running or deployed.");
+		}
+		
+		Runnable cancelAction = new Runnable() {
+			
+			@Override
+			public void run() {
+				Throwable exception = null;
+				
+				for (int triesLeft = NUM_CANCEL_CALL_TRIES; triesLeft > 0; --triesLeft) {
+					
+					try {
+						// send the call. it may be that the task is not really there (asynchronous / overtaking messages)
+						// in which case it is fine (the deployer catches it)
+						TaskOperationResult result = slot.getInstance().getTaskManagerProxy().cancelTask(getJobvertexId(), subTaskIndex);
+						
+						if (result.isSuccess()) {
+							
+							// make sure that we release the slot
+							try {
+								// found and canceled
+								if (STATE_UPDATER.compareAndSet(ExecutionVertex2.this, CANCELING, CANCELED)) {
+									// we completed the call. 
+									// release the slot resource and let the parent know we have cancelled
+									ExecutionVertex2.this.jobVertex.vertexCancelled(ExecutionVertex2.this.subTaskIndex);
+								}
+								else {
+									ExecutionState2 foundState = ExecutionVertex2.this.state;
+									// failing in the meantime may happen and is no problem
+									if (foundState != FAILED) {
+										// corner case? log at least
+										LOG.error(String.format("Asynchronous race: Found state %s after successful cancel call.", foundState));
+									}
+									
+								}
+							} finally {
+								slot.releaseSlot();
+							}
+						}
+						else {
+							// the task was not found, which may be when the task concurrently finishes or fails, or
+							// when the cancel call overtakes the deployment call
+							if (LOG.isDebugEnabled()) {
+								LOG.debug("Cancel task call did not find task. Probably cause: Acceptable asynchronous race.");
+							}
+						}
+						
+						// in any case, we need not call multiple times, so we quit
+						return;
+					}
+					catch (Throwable t) {
+						if (exception == null) {
+							exception = t;
+						}
+						LOG.error("Canceling vertex " + getSimpleName() + " failed (" + triesLeft + " tries left): " + t.getMessage() , t);
+					}
+				}
+				
+				// dang, utterly unsuccessful - the target node must be down, in which case the tasks are lost anyways
+				fail(new Exception("Task could not be canceled.", exception));
+			}
+		};
+		
+		execute(cancelAction);
+	}
+	
 	public Iterable<Instance> getPreferredLocations() {
 		return null;
 	}
 	
+	private void setAssignedSlot(AllocatedSlot slot) {
+		
+		while (true) {
+			AllocatedSlot previous = this.assignedSlot;
+			if (ASSIGNED_SLOT_UPDATER.compareAndSet(this, previous, slot)) {
+				// successfully swapped
+				// release the predecessor, if it was not null. this call is idempotent, so it does not matter if it is
+				// called more than once
+				try {
+					if (previous != null) {
+						previous.releaseSlot();
+					}
+				} catch (Throwable t) {
+					LOG.debug("Error releasing slot " + slot, t);
+				}
+				return;
+			}
+		}
+	}
+	
+	
+	private TaskDeploymentDescriptor createDeploymentDescriptor() {
+		//  create the input gate deployment descriptors
+		List<GateDeploymentDescriptor> inputGates = new ArrayList<GateDeploymentDescriptor>(inputEdges.length);
+		for (ExecutionEdge2[] channels : inputEdges) {
+			inputGates.add(GateDeploymentDescriptor.fromEdges(channels));
+		}
+		
+		// create the output gate deployment descriptors
+		List<GateDeploymentDescriptor> outputGates = new ArrayList<GateDeploymentDescriptor>(resultPartitions.length);
+		for (IntermediateResultPartition partition : resultPartitions) {
+			for (List<ExecutionEdge2> channels : partition.getConsumers()) {
+				outputGates.add(GateDeploymentDescriptor.fromEdges(channels));
+			}
+		}
+		
+		String[] jarFiles = getExecutionGraph().getUserCodeJarFiles();
+		
+		return new TaskDeploymentDescriptor(getJobId(), getJobvertexId(), getTaskName(), 
+				subTaskIndex, getTotalNumberOfParallelSubtasks(), 
+				getExecutionGraph().getJobConfiguration(), jobVertex.getJobVertex().getConfiguration(),
+				jobVertex.getJobVertex().getInvokableClassName(), outputGates, inputGates, jarFiles);
+	}
 	
 	// --------------------------------------------------------------------------------------------
 	//  Utilities
 	// --------------------------------------------------------------------------------------------
 	
+	public void execute(Runnable action) {
+		this.jobVertex.execute(action);
+	}
+	
 	/**
 	 * Creates a simple name representation in the style 'taskname (x/y)', where
 	 * 'taskname' is the name as returned by {@link #getTaskName()}, 'x' is the parallel
@@ -81,4 +664,9 @@ public class ExecutionVertex2 {
 	public String getSimpleName() {
 		return getTaskName() + " (" + (getParallelSubtaskIndex()+1) + '/' + getTotalNumberOfParallelSubtasks() + ')';
 	}
+	
+	@Override
+	public String toString() {
+		return getSimpleName() + " [" + state + ']';
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertexID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertexID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertexID.java
deleted file mode 100644
index 87e2120..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertexID.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.executiongraph;
-
-import org.apache.flink.runtime.AbstractID;
-import org.apache.flink.runtime.managementgraph.ManagementVertexID;
-
-/**
- * A class for statistically unique execution vertex IDs.
- * 
- */
-public class ExecutionVertexID extends AbstractID {
-
-	private static final long serialVersionUID = 1L;
-	
-	/**
-	 * Creates a new random execution vertex id.
-	 */
-	public ExecutionVertexID() {
-		super();
-	}
-	
-	/**
-	 * Creates a new execution vertex id, equal to the given id.
-	 * 
-	 * @param from The id to copy.
-	 */
-	public ExecutionVertexID(AbstractID from) {
-		super(from);
-	}
-	
-	/**
-	 * Converts the execution vertex ID into a
-	 * management vertex ID. The new management vertex ID
-	 * will be equal to the execution vertex ID in the sense
-	 * that the <code>equals</code> method will return <code>
-	 * true</code> when both IDs are compared.
-	 * 
-	 * @return the new management vertex ID
-	 */
-	public ManagementVertexID toManagementVertexID() {
-		return new ManagementVertexID(this);
-	}
-
-	/**
-	 * Converts the given management vertex ID into the corresponding execution vertex ID. The new execution vertex ID
-	 * will be equals to the management vertex ID in the sense that the <code>equals</code> method will return
-	 * <code>true</code> when both IDs are compared.
-	 * 
-	 * @param vertexID
-	 *        the management vertex ID to be converted
-	 * @return the resulting execution vertex ID
-	 */
-	public static ExecutionVertexID fromManagementVertexID(ManagementVertexID vertexID) {
-		return new ExecutionVertexID(vertexID);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/GraphConversionException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/GraphConversionException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/GraphConversionException.java
deleted file mode 100644
index 75e2e95..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/GraphConversionException.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.executiongraph;
-
-/**
- * A graph conversion exception is thrown if the creation of transformation
- * of an {@link ExecutionGraph} fails.
- * 
- */
-public class GraphConversionException extends Exception {
-
-	/**
-	 * Generated serial version UID.
-	 */
-	private static final long serialVersionUID = -7623370680208569211L;
-
-	/**
-	 * Creates a new exception with the given error message.
-	 * 
-	 * @param msg
-	 *        the error message to be transported through this exception
-	 */
-	public GraphConversionException(String msg) {
-		super(msg);
-	}
-
-	public GraphConversionException(String message, Throwable cause) {
-		super(message, cause);
-	}
-
-	public GraphConversionException(Throwable cause) {
-		super(cause);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
new file mode 100644
index 0000000..540996f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+
+public class IntermediateResult {
+
+	private final IntermediateDataSetID id;
+	
+	private final ExecutionJobVertex producer;
+	
+	private final IntermediateResultPartition[] partitions;
+	
+	private final int numParallelProducers;
+	
+	private int partitionsAssigned;
+	
+	private int numConsumers;
+	
+	
+	public IntermediateResult(IntermediateDataSetID id, ExecutionJobVertex producer, int numParallelProducers) {
+		this.id = id;
+		this.producer = producer;
+		this.partitions = new IntermediateResultPartition[numParallelProducers];
+		this.numParallelProducers = numParallelProducers;
+		
+		// we do not set the intermediate result partitions here, because we let them be initialized by
+		// the execution vertex that produces them
+	}
+	
+	public void setPartition(int partitionNumber, IntermediateResultPartition partition) {
+		if (partition == null || partitionNumber < 0 || partitionNumber >= numParallelProducers) {
+			throw new IllegalArgumentException();
+		}
+		
+		if (partitions[partitionNumber] != null) {
+			throw new IllegalStateException("Partition #" + partitionNumber + " has already been assigned.");
+		}
+		
+		partitions[partitionNumber] = partition;
+		partitionsAssigned++;
+	}
+	
+	
+	
+	public IntermediateDataSetID getId() {
+		return id;
+	}
+	
+	public IntermediateResultPartition[] getPartitions() {
+		return partitions;
+	}
+	
+	public int registerConsumer() {
+		final int index = numConsumers;
+		numConsumers++;
+		
+		for (IntermediateResultPartition p : partitions) {
+			if (p.addConsumerGroup() != index) {
+				throw new RuntimeException("Inconsistent consumer mapping between intermediate result partitions.");
+			}
+		}
+		return index;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
new file mode 100644
index 0000000..13bb930
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class IntermediateResultPartition {
+	
+	private final IntermediateResult totalResut;
+	
+	private final ExecutionVertex2 producer;
+	
+	private final int partition;
+	
+	private List<List<ExecutionEdge2>> consumers;
+	
+	
+	public IntermediateResultPartition(IntermediateResult totalResut, ExecutionVertex2 producer, int partition) {
+		this.totalResut = totalResut;
+		this.producer = producer;
+		this.partition = partition;
+		this.consumers = new ArrayList<List<ExecutionEdge2>>(0);
+	}
+	
+	public ExecutionVertex2 getProducer() {
+		return producer;
+	}
+	
+	public int getPartition() {
+		return partition;
+	}
+	
+	public IntermediateResult getIntermediateResult() {
+		return totalResut;
+	}
+	
+	public List<List<ExecutionEdge2>> getConsumers() {
+		return consumers;
+	}
+	
+	int addConsumerGroup() {
+		int pos = consumers.size();
+		consumers.add(new ArrayList<ExecutionEdge2>());
+		return pos;
+	}
+	
+	public void addConsumer(ExecutionEdge2 edge, int consumerNumber) {
+		consumers.get(consumerNumber).add(edge);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/InternalJobStatus.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/InternalJobStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/InternalJobStatus.java
deleted file mode 100644
index f7ccda1..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/InternalJobStatus.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.executiongraph;
-
-import org.apache.flink.runtime.jobgraph.JobStatus;
-
-/**
- * This enumeration contains all states a job represented by an {@link ExecutionGraph} can have during its lifetime. It
- * contains all states from {@link JobStatus} but also internal states to keep track of shutdown processes.
- * <p>
- * This class is thread-safe.
- * 
- */
-public enum InternalJobStatus {
-
-	/**
-	 * All tasks of the job are in the execution state CREATED.
-	 */
-	CREATED,
-
-	/**
-	 * All tasks of the job have been accepted by the scheduler, resources have been requested
-	 */
-	SCHEDULED,
-
-	/**
-	 * At least one task of the job is running, none has definitely failed.
-	 */
-	RUNNING,
-
-	/**
-	 * At least one task of the job has definitely failed and cannot be recovered. The job is in the process of being
-	 * terminated.
-	 */
-	FAILING,
-
-	/**
-	 * At least one task of the job has definitively failed and cannot
-	 * be recovered anymore. As a result, the job has been terminated.
-	 */
-	FAILED,
-
-	/**
-	 * At least one task has been canceled as a result of a user request. The job is in the process of being canceled
-	 * completely.
-	 */
-	CANCELING,
-
-	/**
-	 * All tasks of the job are canceled as a result of a user request. The job has been terminated.
-	 */
-	CANCELED,
-
-	/**
-	 * All of the job's tasks have successfully finished.
-	 */
-	FINISHED;
-
-	/**
-	 * Converts an internal job status in a {@link JobStatus} state.
-	 * 
-	 * @param status
-	 *        the internal job status to converted.
-	 * @return the corresponding job status or <code>null</code> if no corresponding job status exists
-	 */
-	@SuppressWarnings("incomplete-switch")
-	public static JobStatus toJobStatus(InternalJobStatus status) {
-
-		switch (status) {
-
-		case CREATED:
-			return JobStatus.CREATED;
-		case SCHEDULED:
-			return JobStatus.SCHEDULED;
-		case RUNNING:
-			return JobStatus.RUNNING;
-		case FAILED:
-			return JobStatus.FAILED;
-		case CANCELED:
-			return JobStatus.CANCELED;
-		case FINISHED:
-			return JobStatus.FINISHED;
-		}
-
-		return null;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobStatusListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobStatusListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobStatusListener.java
index aabed5c..512a381 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobStatusListener.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobStatusListener.java
@@ -16,25 +16,21 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.executiongraph;
 
+import org.apache.flink.runtime.jobgraph.JobStatus;
+
 /**
- * This interface allows objects to receive notifications
- * when the status of an observed job has changed.
- * 
+ * This interface allows objects to receive notifications when the status of an observed job has changed.
  */
 public interface JobStatusListener {
 
-/**
-	 * Called when the status of the job with the given {@li
+	/**
+	 * Called when the status of the job changed.
 	 * 
-	 * @param executionGraph
-	 *        the executionGraph representing the job the event belongs to
-	 * @param newJobStatus
-	 *        the new job status
-	 * @param optionalMessage
-	 *        an optional message (possibly <code>null</code>) that can be attached to the state change
+	 * @param executionGraph   The executionGraph representing the job.
+	 * @param newJobStatus     The new job status.
+	 * @param optionalMessage  An optional message (possibly <code>null</code>) that can be attached to the state change.
 	 */
-	void jobStatusHasChanged(ExecutionGraph executionGraph, InternalJobStatus newJobStatus, String optionalMessage);
+	void jobStatusHasChanged(ExecutionGraph executionGraph, JobStatus newJobStatus, String optionalMessage);
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
index cb7e658..f3f489b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.instance;
 
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import org.apache.flink.runtime.executiongraph.ExecutionVertex2;
 import org.apache.flink.runtime.jobgraph.JobID;
@@ -30,7 +31,10 @@ public class AllocatedSlot {
 	
 	private static final AtomicIntegerFieldUpdater<AllocatedSlot> STATUS_UPDATER = 
 			AtomicIntegerFieldUpdater.newUpdater(AllocatedSlot.class, "status");
-	 
+	
+	private static final AtomicReferenceFieldUpdater<AllocatedSlot, ExecutionVertex2> VERTEX_UPDATER =
+			AtomicReferenceFieldUpdater.newUpdater(AllocatedSlot.class, ExecutionVertex2.class, "executedVertex");
+	
 	private static final int ALLOCATED_AND_ALIVE = 0;		// tasks may be added and might be running
 	private static final int CANCELLED = 1;					// no more tasks may run
 	private static final int RELEASED = 2;					// has been given back to the instance
@@ -45,10 +49,13 @@ public class AllocatedSlot {
 	/** The number of the slot on which the task is deployed */
 	private final int slotNumber;
 	
+	/** Vertex being executed in the slot. Volatile to force a memory barrier and allow for correct double-checking */
+	private volatile ExecutionVertex2 executedVertex;
+	
+	/** The state of the vertex, only atomically updated */
 	private volatile int status = ALLOCATED_AND_ALIVE;
 	
 
-
 	public AllocatedSlot(JobID jobID, Instance instance, int slotNumber) {
 		if (jobID == null || instance == null || slotNumber < 0) {
 			throw new IllegalArgumentException();
@@ -78,18 +85,34 @@ public class AllocatedSlot {
 		return slotNumber;
 	}
 	
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * @param vertex
-	 * 
-	 * @return True, if the task was scheduled correctly, false if the slot was asynchronously deallocated
-	 *         in the meantime.
-	 */
-	public boolean runTask(ExecutionVertex2 vertex) {
+	public boolean setExecutedVertex(ExecutionVertex2 executedVertex) {
+		if (executedVertex == null) {
+			throw new NullPointerException();
+		}
+		
+		// check that we can actually run in this slot
+		if (status != ALLOCATED_AND_ALIVE) {
+			return false;
+		}
+		
+		// atomically assign the vertex
+		if (!VERTEX_UPDATER.compareAndSet(this, null, executedVertex)) {
+			return false;
+		}
+
+		// we need to do a double check that we were not cancelled in the meantime
+		if (status != ALLOCATED_AND_ALIVE) {
+			this.executedVertex = null;
+			return false;
+		}
+		
 		return true;
 	}
 	
+	public ExecutionVertex2 getExecutedVertex() {
+		return executedVertex;
+	}
+	
 	// --------------------------------------------------------------------------------------------
 	//  Status and life cycle
 	// --------------------------------------------------------------------------------------------
@@ -110,6 +133,9 @@ public class AllocatedSlot {
 	public void cancel() {
 		if (STATUS_UPDATER.compareAndSet(this, ALLOCATED_AND_ALIVE, CANCELLED)) {
 			// kill all tasks currently running in this slot
+			if (this.executedVertex != null) {
+				this.executedVertex.fail(new Exception("The slot in which the task was scheduled has been cancelled."));
+			}
 		}
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/instance/DefaultInstanceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/DefaultInstanceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/DefaultInstanceManager.java
deleted file mode 100644
index eca23c4..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/DefaultInstanceManager.java
+++ /dev/null
@@ -1,314 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.GlobalConfiguration;
-
-/**
- * A simple implementation of an {@link InstanceManager}.
- */
-public class DefaultInstanceManager implements InstanceManager {
-
-	private static final Logger LOG = LoggerFactory.getLogger(DefaultInstanceManager.class);
-
-	// ------------------------------------------------------------------------
-	// Fields
-	// ------------------------------------------------------------------------
-
-	/** Global lock */
-	private final Object lock = new Object();
-
-	/** Set of hosts known to run a task manager that are thus able to execute tasks (by ID). */
-	private final Map<InstanceID, Instance> registeredHostsById;
-
-	/** Set of hosts known to run a task manager that are thus able to execute tasks (by connection). */
-	private final Map<InstanceConnectionInfo, Instance> registeredHostsByConnection;
-	
-	/** Set of hosts that were present once and have died */
-	private final Set<InstanceConnectionInfo> deadHosts;
-	
-	/** Listeners that want to be notified about availability and disappearance of instances */
-	private final List<InstanceListener> instanceListeners = new ArrayList<InstanceListener>();
-
-	/** Duration after which a task manager is considered dead if it did not send a heart-beat message. */
-	private final long heartbeatTimeout;
-	
-	/** The total number of task slots that the system has */
-	private int totalNumberOfAliveTaskSlots;
-
-	/** Flag marking the system as shut down */
-	private volatile boolean shutdown;
-
-	// ------------------------------------------------------------------------
-	// Constructor and set-up
-	// ------------------------------------------------------------------------
-	
-	/**
-	 * Creates an instance manager, using the global configuration value for maximum interval between heartbeats
-	 * where a task manager is still considered alive.
-	 */
-	public DefaultInstanceManager() {
-		this(1000 * GlobalConfiguration.getLong(
-				ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY,
-				ConfigConstants.DEFAULT_JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT));
-	}
-	
-	public DefaultInstanceManager(long heartbeatTimeout) {
-		this(heartbeatTimeout, heartbeatTimeout);
-	}
-	
-	public DefaultInstanceManager(long heartbeatTimeout, long cleanupInterval) {
-		if (heartbeatTimeout <= 0 || cleanupInterval <= 0) {
-			throw new IllegalArgumentException("Heartbeat timeout and cleanup interval must be positive.");
-		}
-		
-		this.registeredHostsById = new HashMap<InstanceID, Instance>();
-		this.registeredHostsByConnection = new HashMap<InstanceConnectionInfo, Instance>();
-		this.deadHosts = new HashSet<InstanceConnectionInfo>();
-		this.heartbeatTimeout = heartbeatTimeout;
-
-		new Timer(true).schedule(cleanupStaleMachines, cleanupInterval, cleanupInterval);
-	}
-
-	@Override
-	public void shutdown() {
-		synchronized (this.lock) {
-			if (this.shutdown) {
-				return;
-			}
-			this.shutdown = true;
-
-			this.cleanupStaleMachines.cancel();
-
-			for (Instance i : this.registeredHostsById.values()) {
-				i.markDead();
-			}
-			
-			this.registeredHostsById.clear();
-			this.registeredHostsByConnection.clear();
-			this.deadHosts.clear();
-			this.totalNumberOfAliveTaskSlots = 0;
-		}
-	}
-
-	@Override
-	public boolean reportHeartBeat(InstanceID instanceId) {
-		if (instanceId == null) {
-			throw new IllegalArgumentException("InstanceID may not be null.");
-		}
-		
-		synchronized (this.lock) {
-			if (this.shutdown) {
-				throw new IllegalStateException("InstanceManager is shut down.");
-			}
-			
-			Instance host = registeredHostsById.get(instanceId);
-
-			if (host == null){
-				if (LOG.isDebugEnabled()) {
-					LOG.debug("Received hearbeat from unknown TaskManager with instance ID " + instanceId.toString() + 
-							" Possibly TaskManager was maked as dead (timed-out) earlier. " +
-							"Reporting back that task manager is no longer known.");
-				}
-				return false;
-			}
-
-			host.reportHeartBeat();
-			return true;
-		}
-	}
-
-	@Override
-	public InstanceID registerTaskManager(InstanceConnectionInfo instanceConnectionInfo, HardwareDescription resources, int numberOfSlots){
-		synchronized(this.lock){
-			if (this.shutdown) {
-				throw new IllegalStateException("InstanceManager is shut down.");
-			}
-			
-			Instance prior = registeredHostsByConnection.get(instanceConnectionInfo);
-			if (prior != null) {
-				LOG.error("Registration attempt from TaskManager with connection info " + instanceConnectionInfo + 
-						". This connection is already registered under ID " + prior.getId());
-				return null;
-			}
-			
-			boolean wasDead = this.deadHosts.remove(instanceConnectionInfo);
-			if (wasDead) {
-				LOG.info("Registering TaskManager with connection info " + instanceConnectionInfo + 
-						" which was marked as dead earlier because of a heart-beat timeout.");
-			}
-
-			InstanceID id = null;
-			do {
-				id = new InstanceID();
-			} while (registeredHostsById.containsKey(id));
-			
-			
-			Instance host = new Instance(instanceConnectionInfo, id, resources, numberOfSlots);
-			
-			registeredHostsById.put(id, host);
-			registeredHostsByConnection.put(instanceConnectionInfo, host);
-			
-			totalNumberOfAliveTaskSlots += numberOfSlots;
-			
-			if (LOG.isInfoEnabled()) {
-				LOG.info(String.format("Registered TaskManager at %s as %s. Current number of registered hosts is %d.",
-						instanceConnectionInfo, id, registeredHostsById.size()));
-			}
-
-			host.reportHeartBeat();
-			
-			// notify all listeners (for example the scheduler)
-			notifyNewInstance(host);
-			
-			return id;
-		}
-	}
-
-	@Override
-	public int getNumberOfRegisteredTaskManagers() {
-		return this.registeredHostsById.size();
-	}
-
-	@Override
-	public int getTotalNumberOfSlots() {
-		return this.totalNumberOfAliveTaskSlots;
-	}
-	
-	@Override
-	public Map<InstanceID, Instance> getAllRegisteredInstances() {
-		return this.registeredHostsById;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	public void addInstanceListener(InstanceListener listener) {
-		synchronized (this.instanceListeners) {
-			this.instanceListeners.add(listener);
-		}
-	}
-	
-	public void removeInstanceListener(InstanceListener listener) {
-		synchronized (this.instanceListeners) {
-			this.instanceListeners.remove(listener);
-		}
-	}
-	
-	private void notifyNewInstance(Instance instance) {
-		synchronized (this.instanceListeners) {
-			for (InstanceListener listener : this.instanceListeners) {
-				try {
-					listener.newInstanceAvailable(instance);
-				}
-				catch (Throwable t) {
-					LOG.error("Notification of new instance availability failed.", t);
-				}
-			}
-		}
-	}
-	
-	private void notifyDeadInstance(Instance instance) {
-		synchronized (this.instanceListeners) {
-			for (InstanceListener listener : this.instanceListeners) {
-				try {
-					listener.instanceDied(instance);
-				}
-				catch (Throwable t) {
-					LOG.error("Notification of dead instance failed.", t);
-				}
-			}
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	private void checkForDeadInstances() {
-		final long now = System.currentTimeMillis();
-		final long timeout = DefaultInstanceManager.this.heartbeatTimeout;
-		
-		synchronized (DefaultInstanceManager.this.lock) {
-			if (DefaultInstanceManager.this.shutdown) {
-				return;
-			}
-
-			final Iterator<Map.Entry<InstanceID, Instance>> entries = registeredHostsById.entrySet().iterator();
-			
-			// check all hosts whether they did not send heart-beat messages.
-			while (entries.hasNext()) {
-				
-				final Map.Entry<InstanceID, Instance> entry = entries.next();
-				final Instance host = entry.getValue();
-				
-				if (!host.isStillAlive(now, timeout)) {
-					
-					// remove from the living
-					entries.remove();
-					registeredHostsByConnection.remove(host.getInstanceConnectionInfo());
-					
-					// add to the dead
-					deadHosts.add(host.getInstanceConnectionInfo());
-					
-					host.markDead();
-					
-					totalNumberOfAliveTaskSlots -= host.getTotalNumberOfSlots();
-					
-					LOG.info(String.format("TaskManager %s at %s did not report a heartbeat for %d msecs - marking as dead. Current number of registered hosts is %d.",
-							host.getId(), host.getInstanceConnectionInfo(), heartbeatTimeout, registeredHostsById.size()));
-					
-					// report to all listeners
-					notifyDeadInstance(host);
-				}
-			}
-		}
-	}
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Periodic task that checks whether hosts have not sent their heart-beat
-	 * messages and purges the hosts in this case.
-	 */
-	private final TimerTask cleanupStaleMachines = new TimerTask() {
-		@Override
-		public void run() {
-			try {
-				checkForDeadInstances();
-			}
-			catch (Throwable t) {
-				LOG.error("Checking for dead instances failed.", t);
-			}
-		}
-	};
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
index a168b2c..543ae86 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
@@ -27,6 +27,10 @@ import java.util.List;
 import java.util.Queue;
 import java.util.Set;
 
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheProfileRequest;
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheProfileResponse;
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheUpdate;
 import org.apache.flink.runtime.ipc.RPC;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotAvailablilityListener;
@@ -186,6 +190,28 @@ public class Instance {
 		}
 	}
 	
+	public void checkLibraryAvailability(JobID jobID) throws IOException {
+		String[] requiredLibraries = LibraryCacheManager.getRequiredJarFiles(jobID);
+
+		if (requiredLibraries == null) {
+			throw new IOException("No entry of required libraries for job " + jobID);
+		}
+
+		LibraryCacheProfileRequest request = new LibraryCacheProfileRequest();
+		request.setRequiredLibraries(requiredLibraries);
+
+		// Send the request
+		LibraryCacheProfileResponse response = getTaskManagerProxy().getLibraryCacheProfile(request);
+
+		// Check response and transfer libraries if necessary
+		for (int k = 0; k < requiredLibraries.length; k++) {
+			if (!response.isCached(k)) {
+				LibraryCacheUpdate update = new LibraryCacheUpdate(requiredLibraries[k]);
+				getTaskManagerProxy().updateLibraryCache(update);
+			}
+		}
+	}
+	
 	// --------------------------------------------------------------------------------------------
 	// Heartbeats
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
index 09c384a..5e139f3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
@@ -18,22 +18,289 @@
 
 package org.apache.flink.runtime.instance;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.GlobalConfiguration;
 
 /**
  * Simple manager that keeps track of which TaskManager are available and alive.
  */
-public interface InstanceManager {
+public class InstanceManager {
+
+	private static final Log LOG = LogFactory.getLog(InstanceManager.class);
+
+	// ------------------------------------------------------------------------
+	// Fields
+	// ------------------------------------------------------------------------
+
+	/** Global lock */
+	private final Object lock = new Object();
+
+	/** Set of hosts known to run a task manager that are thus able to execute tasks (by ID). */
+	private final Map<InstanceID, Instance> registeredHostsById;
+
+	/** Set of hosts known to run a task manager that are thus able to execute tasks (by connection). */
+	private final Map<InstanceConnectionInfo, Instance> registeredHostsByConnection;
+	
+	/** Set of hosts that were present once and have died */
+	private final Set<InstanceConnectionInfo> deadHosts;
+	
+	/** Listeners that want to be notified about availability and disappearance of instances */
+	private final List<InstanceListener> instanceListeners = new ArrayList<InstanceListener>();
 
-	InstanceID registerTaskManager(InstanceConnectionInfo instanceConnectionInfo, HardwareDescription resources, int numberOfTaskSlots);
+	/** Duration after which a task manager is considered dead if it did not send a heart-beat message. */
+	private final long heartbeatTimeout;
 	
-	boolean reportHeartBeat(InstanceID instance);
+	/** The total number of task slots that the system has */
+	private int totalNumberOfAliveTaskSlots;
+
+	/** Flag marking the system as shut down */
+	private volatile boolean shutdown;
+
+	// ------------------------------------------------------------------------
+	// Constructor and set-up
+	// ------------------------------------------------------------------------
+	
+	/**
+	 * Creates an instance manager, using the global configuration value for maximum interval between heartbeats
+	 * where a task manager is still considered alive.
+	 */
+	public InstanceManager() {
+		this(1000 * GlobalConfiguration.getLong(
+				ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY,
+				ConfigConstants.DEFAULT_JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT));
+	}
+	
+	public InstanceManager(long heartbeatTimeout) {
+		this(heartbeatTimeout, heartbeatTimeout);
+	}
+	
+	public InstanceManager(long heartbeatTimeout, long cleanupInterval) {
+		if (heartbeatTimeout <= 0 || cleanupInterval <= 0) {
+			throw new IllegalArgumentException("Heartbeat timeout and cleanup interval must be positive.");
+		}
+		
+		this.registeredHostsById = new HashMap<InstanceID, Instance>();
+		this.registeredHostsByConnection = new HashMap<InstanceConnectionInfo, Instance>();
+		this.deadHosts = new HashSet<InstanceConnectionInfo>();
+		this.heartbeatTimeout = heartbeatTimeout;
+
+		new Timer(true).schedule(cleanupStaleMachines, cleanupInterval, cleanupInterval);
+	}
+
+	public void shutdown() {
+		synchronized (this.lock) {
+			if (this.shutdown) {
+				return;
+			}
+			this.shutdown = true;
+
+			this.cleanupStaleMachines.cancel();
 
-	void shutdown();
+			for (Instance i : this.registeredHostsById.values()) {
+				i.markDead();
+			}
+			
+			this.registeredHostsById.clear();
+			this.registeredHostsByConnection.clear();
+			this.deadHosts.clear();
+			this.totalNumberOfAliveTaskSlots = 0;
+		}
+	}
 
-	Map<InstanceID, Instance> getAllRegisteredInstances();
+	public boolean reportHeartBeat(InstanceID instanceId) {
+		if (instanceId == null) {
+			throw new IllegalArgumentException("InstanceID may not be null.");
+		}
+		
+		synchronized (this.lock) {
+			if (this.shutdown) {
+				throw new IllegalStateException("InstanceManager is shut down.");
+			}
+			
+			Instance host = registeredHostsById.get(instanceId);
 
-	int getNumberOfRegisteredTaskManagers();
+			if (host == null){
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("Received hearbeat from unknown TaskManager with instance ID " + instanceId.toString() + 
+							" Possibly TaskManager was maked as dead (timed-out) earlier. " +
+							"Reporting back that task manager is no longer known.");
+				}
+				return false;
+			}
 
-	int getTotalNumberOfSlots();
+			host.reportHeartBeat();
+			return true;
+		}
+	}
+
+	public InstanceID registerTaskManager(InstanceConnectionInfo instanceConnectionInfo, HardwareDescription resources, int numberOfSlots){
+		synchronized(this.lock){
+			if (this.shutdown) {
+				throw new IllegalStateException("InstanceManager is shut down.");
+			}
+			
+			Instance prior = registeredHostsByConnection.get(instanceConnectionInfo);
+			if (prior != null) {
+				LOG.error("Registration attempt from TaskManager with connection info " + instanceConnectionInfo + 
+						". This connection is already registered under ID " + prior.getId());
+				return null;
+			}
+			
+			boolean wasDead = this.deadHosts.remove(instanceConnectionInfo);
+			if (wasDead) {
+				LOG.info("Registering TaskManager with connection info " + instanceConnectionInfo + 
+						" which was marked as dead earlier because of a heart-beat timeout.");
+			}
+
+			InstanceID id = null;
+			do {
+				id = new InstanceID();
+			} while (registeredHostsById.containsKey(id));
+			
+			
+			Instance host = new Instance(instanceConnectionInfo, id, resources, numberOfSlots);
+			
+			registeredHostsById.put(id, host);
+			registeredHostsByConnection.put(instanceConnectionInfo, host);
+			
+			totalNumberOfAliveTaskSlots += numberOfSlots;
+			
+			if (LOG.isInfoEnabled()) {
+				LOG.info(String.format("Registered TaskManager at %s as %s. Current number of registered hosts is %d.",
+						instanceConnectionInfo, id, registeredHostsById.size()));
+			}
+
+			host.reportHeartBeat();
+			
+			// notify all listeners (for example the scheduler)
+			notifyNewInstance(host);
+			
+			return id;
+		}
+	}
+
+	public int getNumberOfRegisteredTaskManagers() {
+		return this.registeredHostsById.size();
+	}
+
+	public int getTotalNumberOfSlots() {
+		return this.totalNumberOfAliveTaskSlots;
+	}
+	
+	public Map<InstanceID, Instance> getAllRegisteredInstances() {
+		synchronized (this.lock) {
+			return new HashMap<InstanceID, Instance>(this.registeredHostsById);
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public void addInstanceListener(InstanceListener listener) {
+		synchronized (this.instanceListeners) {
+			this.instanceListeners.add(listener);
+		}
+	}
+	
+	public void removeInstanceListener(InstanceListener listener) {
+		synchronized (this.instanceListeners) {
+			this.instanceListeners.remove(listener);
+		}
+	}
+	
+	private void notifyNewInstance(Instance instance) {
+		synchronized (this.instanceListeners) {
+			for (InstanceListener listener : this.instanceListeners) {
+				try {
+					listener.newInstanceAvailable(instance);
+				}
+				catch (Throwable t) {
+					LOG.error("Notification of new instance availability failed.", t);
+				}
+			}
+		}
+	}
+	
+	private void notifyDeadInstance(Instance instance) {
+		synchronized (this.instanceListeners) {
+			for (InstanceListener listener : this.instanceListeners) {
+				try {
+					listener.instanceDied(instance);
+				}
+				catch (Throwable t) {
+					LOG.error("Notification of dead instance failed.", t);
+				}
+			}
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	private void checkForDeadInstances() {
+		final long now = System.currentTimeMillis();
+		final long timeout = InstanceManager.this.heartbeatTimeout;
+		
+		synchronized (InstanceManager.this.lock) {
+			if (InstanceManager.this.shutdown) {
+				return;
+			}
+
+			final Iterator<Map.Entry<InstanceID, Instance>> entries = registeredHostsById.entrySet().iterator();
+			
+			// check all hosts whether they did not send heart-beat messages.
+			while (entries.hasNext()) {
+				
+				final Map.Entry<InstanceID, Instance> entry = entries.next();
+				final Instance host = entry.getValue();
+				
+				if (!host.isStillAlive(now, timeout)) {
+					
+					// remove from the living
+					entries.remove();
+					registeredHostsByConnection.remove(host.getInstanceConnectionInfo());
+					
+					// add to the dead
+					deadHosts.add(host.getInstanceConnectionInfo());
+					
+					host.markDead();
+					
+					totalNumberOfAliveTaskSlots -= host.getTotalNumberOfSlots();
+					
+					LOG.info(String.format("TaskManager %s at %s did not report a heartbeat for %d msecs - marking as dead. Current number of registered hosts is %d.",
+							host.getId(), host.getInstanceConnectionInfo(), heartbeatTimeout, registeredHostsById.size()));
+					
+					// report to all listeners
+					notifyDeadInstance(host);
+				}
+			}
+		}
+	}
+	// --------------------------------------------------------------------------------------------
+	
+	/**
+	 * Periodic task that checks whether hosts have not sent their heart-beat
+	 * messages and purges the hosts in this case.
+	 */
+	private final TimerTask cleanupStaleMachines = new TimerTask() {
+		@Override
+		public void run() {
+			try {
+				checkForDeadInstances();
+			}
+			catch (Throwable t) {
+				LOG.error("Checking for dead instances failed.", t);
+			}
+		}
+	};
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/instance/LocalInstanceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/LocalInstanceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/LocalInstanceManager.java
index f8f41ae..832b8cd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/LocalInstanceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/LocalInstanceManager.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.instance;
 
-
 import java.util.ArrayList;
 import java.util.List;
 
@@ -29,7 +28,11 @@ import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.ExecutionMode;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 
-public class LocalInstanceManager extends DefaultInstanceManager {
+/**
+ * A variant of the {@link InstanceManager} that internally spawn task managers as instances, rather than waiting for external
+ * TaskManagers to register.
+ */
+public class LocalInstanceManager extends InstanceManager {
 	
 	private final List<TaskManager> taskManagers = new ArrayList<TaskManager>();
 
@@ -55,7 +58,7 @@ public class LocalInstanceManager extends DefaultInstanceManager {
 				GlobalConfiguration.includeConfiguration(tm);
 			}
 
-			taskManagers.add(new TaskManager(execMode));
+			taskManagers.add(TaskManager.createTaskManager(execMode));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelReaderInputView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelReaderInputView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelReaderInputView.java
index cd2e9ca..772a4f0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelReaderInputView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelReaderInputView.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.io.disk.iomanager;
 
 import java.io.EOFException;
@@ -28,9 +27,9 @@ import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.memorymanager.AbstractPagedInputView;
 
-
 /**
- * A {@link DataInputView} that is backed by a {@link BlockChannelReader}, making it effectively a data input
+ * A {@link org.apache.flink.core.memory.DataInputView} that is backed by a
+ * {@link BlockChannelReader}, making it effectively a data input
  * stream. The view reads it data in blocks from the underlying channel. The view can only read data that
  * has been written by a {@link ChannelWriterOutputView}, due to block formatting.
  */
@@ -245,8 +244,7 @@ public class ChannelReaderInputView extends AbstractPagedInputView {
 	
 
 	@Override
-	protected int getLimitForSegment(MemorySegment segment)
-	{
+	protected int getLimitForSegment(MemorySegment segment) {
 		return segment.getInt(ChannelWriterOutputView.HEAD_BLOCK_LENGTH_OFFSET);
 	}
 	
@@ -257,8 +255,7 @@ public class ChannelReaderInputView extends AbstractPagedInputView {
 	 * @param seg The segment to use for the read request.
 	 * @throws IOException Thrown, if the reader is in error.
 	 */
-	protected void sendReadRequest(MemorySegment seg) throws IOException
-	{
+	protected void sendReadRequest(MemorySegment seg) throws IOException {
 		if (this.numRequestsRemaining != 0) {
 			this.reader.readBlock(seg);
 			if (this.numRequestsRemaining != -1) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelWriterOutputView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelWriterOutputView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelWriterOutputView.java
index 06b49ae..c87c308 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelWriterOutputView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelWriterOutputView.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.io.disk.iomanager;
 
 import java.io.IOException;
@@ -27,15 +26,14 @@ import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.memorymanager.AbstractPagedOutputView;
 
-
 /**
- * A {@link DataOutputView} that is backed by a {@link BlockChannelWriter}, making it effectively a data output
+ * A {@link org.apache.flink.core.memory.DataOutputView} that is backed by a
+ * {@link BlockChannelWriter}, making it effectively a data output
  * stream. The view writes it data in blocks to the underlying channel, adding a minimal header to each block.
  * The data can be re-read by a {@link ChannelReaderInputView}, if it uses the same block size.
- *
  */
-public final class ChannelWriterOutputView extends AbstractPagedOutputView
-{
+public final class ChannelWriterOutputView extends AbstractPagedOutputView {
+	
 	/**
 	 * The magic number that identifies blocks as blocks from a ChannelWriterOutputView.
 	 */

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/Deserializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/Deserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/Deserializer.java
deleted file mode 100644
index c6a2ebe..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/Deserializer.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.disk.iomanager;
-
-import java.io.DataInput;
-import java.io.IOException;
-
-/**
- * <p>
- * Provides a facility for deserializing objects of type <T> from an {@link InputStream}.
- * </p>
- * <p>
- * Deserializers are stateful, but must not buffer the input since other producers may read from the input between calls
- * to {@link #deserialize(Object)}.
- * </p>
- * 
- * @param <T>
- */
-public interface Deserializer<T> {
-	/**
-	 * <p>
-	 * Prepare the deserializer for reading.
-	 * </p>
-	 */
-	void open(DataInput in) throws IOException;
-
-	/**
-	 * <p>
-	 * Deserialize the next object from the underlying input stream. If the object <code>t</code> is non-null then this
-	 * deserializer <i>may</i> set its internal state to the next object read from the input stream. Otherwise, if the
-	 * object <code>t</code> is null a new deserialized object will be created.
-	 * </p>
-	 * 
-	 * @return the deserialized object
-	 */
-	T deserialize(T t) throws IOException;
-
-	/**
-	 * <p>
-	 * Clear up any resources.
-	 * </p>
-	 */
-	void close() throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/HeaderlessChannelReaderInputView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/HeaderlessChannelReaderInputView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/HeaderlessChannelReaderInputView.java
index bd3665f..0147f73 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/HeaderlessChannelReaderInputView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/HeaderlessChannelReaderInputView.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.io.disk.iomanager;
 
 import java.io.EOFException;
@@ -25,9 +24,9 @@ import java.util.List;
 
 import org.apache.flink.core.memory.MemorySegment;
 
-
 /**
- * A {@link DataInputView} that is backed by a {@link BlockChannelReader}, making it effectively a data input
+ * A {@link org.apache.flink.core.memory.DataInputView} that is backed by a
+ * {@link BlockChannelReader}, making it effectively a data input
  * stream. This view is similar to the {@link ChannelReaderInputView}, but does not expect
  * a header for each block, giving a direct stream abstraction over sequence of written
  * blocks. It therefore requires specification of the number of blocks and the number of
@@ -73,8 +72,7 @@ public class HeaderlessChannelReaderInputView extends ChannelReaderInputView
 	
 
 	@Override
-	protected MemorySegment nextSegment(MemorySegment current) throws IOException
-	{
+	protected MemorySegment nextSegment(MemorySegment current) throws IOException {
 		// check for end-of-stream
 		if (this.numBlocksRemaining <= 0) {
 			this.reader.close();
@@ -94,8 +92,7 @@ public class HeaderlessChannelReaderInputView extends ChannelReaderInputView
 	
 
 	@Override
-	protected int getLimitForSegment(MemorySegment segment)
-	{
+	protected int getLimitForSegment(MemorySegment segment) {
 		return this.numBlocksRemaining > 0 ? segment.size() : this.lastBlockBytes;
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
index 8fced32..fc39db2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
@@ -31,13 +31,10 @@ import org.apache.flink.core.memory.MemorySegment;
 
 /**
  * The facade for the provided I/O manager services.
- * 
  */
-public final class IOManager implements UncaughtExceptionHandler
-{
-	/**
-	 * Logging.
-	 */
+public class IOManager implements UncaughtExceptionHandler {
+	
+	/** Logging */
 	private static final Logger LOG = LoggerFactory.getLogger(IOManager.class);
 
 	/**
@@ -85,7 +82,7 @@ public final class IOManager implements UncaughtExceptionHandler
 	/**
 	 * Constructs a new IOManager.
 	 * 
-	 * @param path The base directory path for files underlying channels.
+	 * @param tempDir The base directory path for files underlying channels.
 	 */
 	public IOManager(String tempDir) {
 		this(new String[] {tempDir});
@@ -94,12 +91,10 @@ public final class IOManager implements UncaughtExceptionHandler
 	/**
 	 * Constructs a new IOManager.
 	 * 
-	 * @param path
-	 *        the basic directory path for files underlying anonymous
-	 *        channels.
+	 * @param paths
+	 *        the basic directory paths for files underlying anonymous channels.
 	 */
-	public IOManager(String[] paths)
-	{
+	public IOManager(String[] paths) {
 		this.paths = paths;
 		this.random = new Random();
 		this.nextPath = 0;
@@ -199,7 +194,7 @@ public final class IOManager implements UncaughtExceptionHandler
 	@Override
 	public void uncaughtException(Thread t, Throwable e)
 	{
-		LOG.error("IO Thread '" + t.getName() + "' terminated due to an exception. Closing I/O Manager.", e);
+		LOG.fatal("IO Thread '" + t.getName() + "' terminated due to an exception. Closing I/O Manager.", e);
 		shutdown();	
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
index 3aae114..5571ccb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
@@ -26,7 +26,7 @@ import org.apache.flink.runtime.AbstractID;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.execution.RuntimeEnvironment;
-import org.apache.flink.runtime.executiongraph.ExecutionVertexID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
 import org.apache.flink.runtime.io.network.bufferprovider.BufferProvider;
 import org.apache.flink.runtime.io.network.bufferprovider.BufferProviderBroker;
@@ -124,7 +124,7 @@ public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker
 		// Check if we can safely run this task with the given buffers
 		ensureBufferAvailability(task);
 
-		RuntimeEnvironment environment = task.getRuntimeEnvironment();
+		RuntimeEnvironment environment = task.getEnvironment();
 
 		// -------------------------------------------------------------------------------------------------------------
 		//                                       Register output channels
@@ -132,8 +132,8 @@ public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker
 
 		environment.registerGlobalBufferPool(this.globalBufferPool);
 
-		if (this.localBuffersPools.containsKey(task.getVertexID())) {
-			throw new IllegalStateException("Vertex " + task.getVertexID() + " has a previous buffer pool owner");
+		if (this.localBuffersPools.containsKey(task.getExecutionId())) {
+			throw new IllegalStateException("Execution " + task.getExecutionId() + " has a previous buffer pool owner");
 		}
 
 		for (OutputGate gate : environment.outputGates()) {
@@ -155,7 +155,7 @@ public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker
 			}
 		}
 
-		this.localBuffersPools.put(task.getVertexID(), environment);
+		this.localBuffersPools.put(task.getExecutionId(), environment);
 
 		// -------------------------------------------------------------------------------------------------------------
 		//                                       Register input channels
@@ -187,10 +187,10 @@ public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker
 	/**
 	 * Unregisters the given task from the channel manager.
 	 *
-	 * @param vertexId the ID of the task to be unregistered
+	 * @param executionId the ID of the task to be unregistered
 	 * @param task the task to be unregistered
 	 */
-	public void unregister(ExecutionVertexID vertexId, Task task) {
+	public void unregister(ExecutionAttemptID executionId, Task task) {
 		final Environment environment = task.getEnvironment();
 
 		// destroy and remove OUTPUT channels from registered channels and cache
@@ -222,7 +222,7 @@ public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker
 		}
 
 		// clear and remove OUTPUT side buffer pool
-		LocalBufferPoolOwner bufferPool = this.localBuffersPools.remove(vertexId);
+		LocalBufferPoolOwner bufferPool = this.localBuffersPools.remove(executionId);
 		if (bufferPool != null) {
 			bufferPool.clearLocalBufferPool();
 		}
@@ -249,7 +249,7 @@ public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker
 		int numChannels = this.channels.size() + env.getNumberOfOutputChannels() + env.getNumberOfInputChannels();
 
 		// need at least one buffer per channel
-		if (numBuffers / numChannels < 1) {
+		if (numChannels > 0 && numBuffers / numChannels < 1) {
 			String msg = String.format("%s has not enough buffers to safely execute %s (%d buffers missing)",
 					this.connectionInfo.hostname(), env.getTaskName(), numChannels - numBuffers);
 
@@ -582,10 +582,6 @@ public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker
 	/**
 	 * 
 	 * Upon an exception, this method frees the envelope.
-	 * 
-	 * @param envelope
-	 * @return
-	 * @throws IOException
 	 */
 	private final EnvelopeReceiverList getReceiverListForEnvelope(Envelope envelope, boolean reportException) throws IOException {
 		try {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/gates/InputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/gates/InputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/gates/InputGate.java
index 238d1e7..c167093 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/gates/InputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/gates/InputGate.java
@@ -16,10 +16,10 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.io.network.gates;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicReference;
@@ -43,7 +43,7 @@ import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobID;
 
 /**
- * In Nephele input gates are a specialization of general gates and connect input channels and record readers. As
+ * Input gates are a specialization of general gates and connect input channels and record readers. As
  * channels, input gates are always parameterized to a specific type of record which they can transport. In contrast to
  * output gates input gates can be associated with a {@link DistributionPattern} object which dictates the concrete
  * wiring between two groups of vertices.
@@ -103,14 +103,12 @@ public class InputGate<T extends IOReadableWritable> extends Gate<T> implements
 
 	@SuppressWarnings("unchecked")
 	public void initializeChannels(GateDeploymentDescriptor inputGateDescriptor){
-		channels = new InputChannel[inputGateDescriptor.getNumberOfChannelDescriptors()];
-
-		setChannelType(inputGateDescriptor.getChannelType());
-
-		final int nicdd = inputGateDescriptor.getNumberOfChannelDescriptors();
+		List<ChannelDeploymentDescriptor> channelDescr = inputGateDescriptor.getChannels();
+		
+		channels = new InputChannel[channelDescr.size()];
 
-		for(int i = 0; i < nicdd; i++){
-			final ChannelDeploymentDescriptor cdd = inputGateDescriptor.getChannelDescriptor(i);
+		for(int i = 0; i < channelDescr.size(); i++){
+			ChannelDeploymentDescriptor cdd = channelDescr.get(i);
 			channels[i] = new InputChannel<T>(this, i, cdd.getInputChannelID(),
 					cdd.getOutputChannelID(), getChannelType());
 		}


Mime
View raw message