flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [16/18] flink git commit: [FLINK-986] [FLINK-25] [Distributed runtime] Add initial support for intermediate results
Date Mon, 12 Jan 2015 08:16:24 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index e7f4333..5507a10 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -18,29 +18,19 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import static org.apache.flink.runtime.execution.ExecutionState.CANCELED;
-import static org.apache.flink.runtime.execution.ExecutionState.CANCELING;
-import static org.apache.flink.runtime.execution.ExecutionState.CREATED;
-import static org.apache.flink.runtime.execution.ExecutionState.DEPLOYING;
-import static org.apache.flink.runtime.execution.ExecutionState.FAILED;
-import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
-import static org.apache.flink.runtime.execution.ExecutionState.RUNNING;
-import static org.apache.flink.runtime.execution.ExecutionState.SCHEDULED;
-
-import java.io.Serializable;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-
+import akka.actor.ActorRef;
 import akka.dispatch.OnComplete;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.deployment.PartitionInfo;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
@@ -49,14 +39,27 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFutureAction;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
-import org.apache.flink.runtime.messages.TaskManagerMessages.TaskOperationResult;
 import org.apache.flink.util.ExceptionUtils;
 import org.slf4j.Logger;
-
-import com.google.common.base.Preconditions;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
+import java.io.Serializable;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.flink.runtime.execution.ExecutionState.CANCELED;
+import static org.apache.flink.runtime.execution.ExecutionState.CANCELING;
+import static org.apache.flink.runtime.execution.ExecutionState.CREATED;
+import static org.apache.flink.runtime.execution.ExecutionState.DEPLOYING;
+import static org.apache.flink.runtime.execution.ExecutionState.FAILED;
+import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
+import static org.apache.flink.runtime.execution.ExecutionState.RUNNING;
+import static org.apache.flink.runtime.execution.ExecutionState.SCHEDULED;
+
 /**
  * A single execution of a vertex. While an {@link ExecutionVertex} can be executed multiple times (for recovery,
  * or other re-computation), this class tracks the state of a single execution of that vertex and the resources.
@@ -86,8 +89,8 @@ public class Execution implements Serializable {
 	
 	private static final int NUM_CANCEL_CALL_TRIES = 3;
 
-	public static FiniteDuration timeout = new FiniteDuration(
-			ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT, TimeUnit.SECONDS);
+	public static FiniteDuration timeout = new FiniteDuration(ConfigConstants
+			.DEFAULT_AKKA_ASK_TIMEOUT, TimeUnit.SECONDS);
 	
 	// --------------------------------------------------------------------------------------------
 	
@@ -109,13 +112,11 @@ public class Execution implements Serializable {
 	// --------------------------------------------------------------------------------------------
 	
 	public Execution(ExecutionVertex vertex, int attemptNumber, long startTimestamp) {
-		Preconditions.checkNotNull(vertex);
-		Preconditions.checkArgument(attemptNumber >= 0);
-		
-		this.vertex = vertex;
+		this.vertex = checkNotNull(vertex);
 		this.attemptId = new ExecutionAttemptID();
+		checkArgument(attemptNumber >= 0);
 		this.attemptNumber = attemptNumber;
-		
+
 		this.stateTimestamps = new long[ExecutionState.values().length];
 		markTimestamp(ExecutionState.CREATED, startTimestamp);
 	}
@@ -174,30 +175,30 @@ public class Execution implements Serializable {
 	 * @throws IllegalStateException Thrown, if the vertex is not in CREATED state, which is the only state that permits scheduling.
 	 * @throws NoResourceAvailableException Thrown is no queued scheduling is allowed and no resources are currently available.
 	 */
-	public void scheduleForExecution(Scheduler scheduler, boolean queued) throws NoResourceAvailableException {
+	public boolean scheduleForExecution(Scheduler scheduler, boolean queued) throws NoResourceAvailableException {
 		if (scheduler == null) {
 			throw new NullPointerException();
 		}
-		
+
 		final SlotSharingGroup sharingGroup = vertex.getJobVertex().getSlotSharingGroup();
 		final CoLocationConstraint locationConstraint = vertex.getLocationConstraint();
-		
+
 		// sanity check
 		if (locationConstraint != null && sharingGroup == null) {
 			throw new RuntimeException("Trying to schedule with co-location constraint but without slot sharing allowed.");
 		}
-		
+
 		if (transitionState(CREATED, SCHEDULED)) {
-			
+
 			ScheduledUnit toSchedule = locationConstraint == null ?
 				new ScheduledUnit(this, sharingGroup) :
 				new ScheduledUnit(this, sharingGroup, locationConstraint);
-		
+
 			// IMPORTANT: To prevent leaks of cluster resources, we need to make sure that slots are returned
 			//     in all cases where the deployment failed. we use many try {} finally {} clauses to assure that
 			if (queued) {
 				SlotAllocationFuture future = scheduler.scheduleQueued(toSchedule);
-				
+
 				future.setFutureAction(new SlotAllocationFutureAction() {
 					@Override
 					public void slotAllocated(AllocatedSlot slot) {
@@ -227,13 +228,15 @@ public class Execution implements Serializable {
 					}
 				}
 			}
+
+			return true;
 		}
 		else {
 			// call race, already deployed
-			return;
+			return false;
 		}
 	}
-	
+
 	public void deployToSlot(final AllocatedSlot slot) throws JobException {
 		// sanity checks
 		if (slot == null) {
@@ -242,7 +245,7 @@ public class Execution implements Serializable {
 		if (!slot.isAlive()) {
 			throw new JobException("Traget slot for deployment is not alive.");
 		}
-		
+
 		// make sure exactly one deployment call happens from the correct state
 		// note: the transition from CREATED to DEPLOYING is for testing purposes only
 		ExecutionState previous = this.state;
@@ -257,14 +260,14 @@ public class Execution implements Serializable {
 			// vertex may have been cancelled, or it was already scheduled
 			throw new IllegalStateException("The vertex must be in CREATED or SCHEDULED state to be deployed. Found state " + previous);
 		}
-		
+
 		try {
 			// good, we are allowed to deploy
 			if (!slot.setExecutedVertex(this)) {
 				throw new JobException("Could not assign the ExecutionVertex to the slot " + slot);
 			}
 			this.assignedResource = slot;
-			
+
 			// race double check, did we fail/cancel and do we need to release the slot?
 			if (this.state != DEPLOYING) {
 				slot.releaseSlot();
@@ -291,8 +294,9 @@ public class Execution implements Serializable {
 				public void onComplete(Throwable failure, Object success) throws Throwable {
 					if (failure != null) {
 						markFailed(failure);
-					} else {
-						TaskOperationResult result = (TaskOperationResult) success;
+					}
+					else {
+						TaskManagerMessages.TaskOperationResult result = (TaskManagerMessages.TaskOperationResult) success;
 						if (success == null) {
 							markFailed(new Exception("Failed to deploy the task to slot " + slot + ": TaskOperationResult was null"));
 						}
@@ -305,7 +309,8 @@ public class Execution implements Serializable {
 						else {
 							// deployment failed :(
 							markFailed(new Exception("Failed to deploy the task " +
-									getVertexWithAttempt() + " to slot " + slot + ": " + result.description()));
+									getVertexWithAttempt() + " to slot " + slot + ": " + result
+									.description()));
 						}
 					}
 				}
@@ -316,8 +321,7 @@ public class Execution implements Serializable {
 			ExceptionUtils.rethrow(t);
 		}
 	}
-	
-	
+
 	public void cancel() {
 		// depending on the previous state, we go directly to cancelled (no cancel call necessary)
 		// -- or to canceling (cancel call needs to be sent to the task manager)
@@ -346,6 +350,8 @@ public class Execution implements Serializable {
 			else if (current == FINISHED || current == FAILED) {
 				// nothing to do any more. finished failed before it could be cancelled.
 				// in any case, the task is removed from the TaskManager already
+				sendFailIntermediateResultPartitionsRPCCall();
+
 				return;
 			}
 			else if (current == CREATED || current == SCHEDULED) {
@@ -373,40 +379,84 @@ public class Execution implements Serializable {
 			}
 		}
 	}
-	
+
+	// TODO This leads to many unnecessary RPC calls in most cases
+	boolean scheduleOrUpdateConsumers(List<List<ExecutionEdge>> consumers) throws Exception {
+		boolean success = true;
+
+		if (consumers.size() != 1) {
+			throw new IllegalStateException("Only one consumer is supported currently.");
+		}
+
+		final List<ExecutionEdge> consumer = consumers.get(0);
+
+		for (ExecutionEdge edge : consumer) {
+			final ExecutionVertex consumerVertex = edge.getTarget();
+
+			final ExecutionState consumerState = consumerVertex.getExecutionState();
+
+			if (consumerState == CREATED) {
+				if (state == RUNNING) {
+					if (!consumerVertex.scheduleForExecution(consumerVertex.getExecutionGraph().getScheduler(), false)) {
+						success = false;
+					}
+				}
+				else {
+					success = false;
+				}
+			}
+			else if (consumerState == RUNNING) {
+				AllocatedSlot consumerSlot = consumerVertex.getCurrentAssignedResource();
+				ExecutionAttemptID consumerExecutionId = consumerVertex.getCurrentExecutionAttempt().getAttemptId();
+
+				PartitionInfo partitionInfo = PartitionInfo.fromEdge(edge, consumerSlot);
+
+				if (!sendUpdateTaskRpcCall(consumerSlot, consumerExecutionId, edge.getSource().getIntermediateResult().getId(), partitionInfo)) {
+					success = false;
+				}
+
+			}
+			else if (consumerState == SCHEDULED || consumerState == DEPLOYING) {
+				success = false;
+			}
+		}
+
+		return success;
+	}
+
 	/**
 	 * This method fails the vertex due to an external condition. The task will move to state FAILED.
 	 * If the task was in state RUNNING or DEPLOYING before, it will send a cancel call to the TaskManager.
-	 * 
+	 *
 	 * @param t The exception that caused the task to fail.
 	 */
 	public void fail(Throwable t) {
 		processFail(t, false);
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	//   Callbacks
 	// --------------------------------------------------------------------------------------------
-	
+
 	/**
 	 * This method marks the task as failed, but will make no attempt to remove task execution from the task manager.
 	 * It is intended for cases where the task is known not to be running, or then the TaskManager reports failure
 	 * (in which case it has already removed the task).
-	 * 
+	 *
 	 * @param t The exception that caused the task to fail.
 	 */
 	void markFailed(Throwable t) {
 		processFail(t, true);
 	}
-	
+
 	void markFinished() {
-		
+
 		// this call usually comes during RUNNING, but may also come while still in deploying (very fast tasks!)
 		while (true) {
 			ExecutionState current = this.state;
-			
+
 			if (current == RUNNING || current == DEPLOYING) {
-			
+
 				if (transitionState(current, FINISHED)) {
 					try {
 						assignedResource.releaseSlot();
@@ -437,17 +487,17 @@ public class Execution implements Serializable {
 			}
 		}
 	}
-	
+
 	void cancelingComplete() {
-		
+
 		// the taskmanagers can themselves cancel tasks without an external trigger, if they find that the
 		// network stack is canceled (for example by a failing / canceling receiver or sender
 		// this is an artifact of the old network runtime, but for now we need to support task transitions
 		// from running directly to canceled
-		
+
 		while (true) {
 			ExecutionState current = this.state;
-			
+
 			if (current == CANCELED) {
 				return;
 			}
@@ -462,9 +512,9 @@ public class Execution implements Serializable {
 					}
 					return;
 				}
-				
+
 				// else fall through the loop
-			} 
+			}
 			else {
 				// failing in the meantime may happen and is no problem.
 				// anything else is a serious problem !!!
@@ -477,26 +527,26 @@ public class Execution implements Serializable {
 			}
 		}
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	//  Internal Actions
 	// --------------------------------------------------------------------------------------------
-	
+
 	private boolean processFail(Throwable t, boolean isCallback) {
-		
+
 		// damn, we failed. This means only that we keep our books and notify our parent JobExecutionVertex
 		// the actual computation on the task manager is cleaned up by the TaskManager that noticed the failure
-		
+
 		// we may need to loop multiple times (in the presence of concurrent calls) in order to
 		// atomically switch to failed 
 		while (true) {
 			ExecutionState current = this.state;
-			
+
 			if (current == FAILED) {
 				// already failed. It is enough to remember once that we failed (its sad enough)
 				return false;
 			}
-			
+
 			if (current == CANCELED) {
 				// we are already aborting or are already aborted
 				if (LOG.isDebugEnabled()) {
@@ -505,11 +555,11 @@ public class Execution implements Serializable {
 				}
 				return false;
 			}
-			
+
 			if (transitionState(current, FAILED, t)) {
 				// success (in a manner of speaking)
 				this.failureCause = t;
-				
+
 				try {
 					if (assignedResource != null) {
 						assignedResource.releaseSlot();
@@ -524,7 +574,7 @@ public class Execution implements Serializable {
 					if (LOG.isDebugEnabled()) {
 						LOG.debug("Sending out cancel request, to remove task execution from TaskManager.");
 					}
-					
+
 					try {
 						if (assignedResource != null) {
 							sendCancelRpcCall();
@@ -534,15 +584,15 @@ public class Execution implements Serializable {
 						LOG.error("Error triggering cancel call while marking task as failed.", tt);
 					}
 				}
-				
+
 				// leave the loop
 				return true;
 			}
 		}
 	}
-	
+
 	private boolean switchToRunning() {
-		
+
 		if (transitionState(DEPLOYING, RUNNING)) {
 			return true;
 		}
@@ -552,9 +602,9 @@ public class Execution implements Serializable {
 			//  - canceling, while deployment was in progress. state is now canceling, or canceled, if the response overtook
 			//  - finishing (execution and finished call overtook the deployment answer, which is possible and happens for fast tasks)
 			//  - failed (execution, failure, and failure message overtook the deployment answer)
-			
+
 			ExecutionState currentState = this.state;
-			
+
 			if (currentState == FINISHED || currentState == CANCELED) {
 				// do nothing, the task was really fast (nice)
 				// or it was canceled really fast
@@ -568,22 +618,22 @@ public class Execution implements Serializable {
 			else {
 				String message = String.format("Concurrent unexpected state transition of task %s to %s while deployment was in progress.",
 						getVertexWithAttempt(), currentState);
-				
+
 				if (LOG.isDebugEnabled()) {
 					LOG.debug(message);
 				}
-				
+
 				// undo the deployment
 				sendCancelRpcCall();
-				
+
 				// record the failure
 				markFailed(new Exception(message));
 			}
-			
+
 			return false;
 		}
 	}
-	
+
 	private void sendCancelRpcCall() {
 		final AllocatedSlot slot = this.assignedResource;
 		if (slot == null) {
@@ -591,7 +641,7 @@ public class Execution implements Serializable {
 		}
 
 		Future<Object> cancelResult = AkkaUtils.retry(slot.getInstance().getTaskManager(), new
-				TaskManagerMessages.CancelTask(attemptId), NUM_CANCEL_CALL_TRIES,
+						TaskManagerMessages.CancelTask(attemptId), NUM_CANCEL_CALL_TRIES,
 				AkkaUtils.globalExecutionContext(), timeout);
 
 		cancelResult.onComplete(new OnComplete<Object>(){
@@ -601,7 +651,7 @@ public class Execution implements Serializable {
 				if(failure != null){
 					fail(new Exception("Task could not be canceled.", failure));
 				}else{
-					TaskOperationResult result = (TaskOperationResult)success;
+					TaskManagerMessages.TaskOperationResult result = (TaskManagerMessages.TaskOperationResult)success;
 					if(!result.success()){
 						LOG.debug("Cancel task call did not find task. Probably akka message call" +
 								" race.");
@@ -611,44 +661,80 @@ public class Execution implements Serializable {
 		}, AkkaUtils.globalExecutionContext());
 	}
 
+	private void sendFailIntermediateResultPartitionsRPCCall() {
+		final AllocatedSlot slot = this.assignedResource;
+		if (slot == null) {
+			return;
+		}
+
+		final Instance instance = slot.getInstance();
+
+		if (instance.isAlive()) {
+			try {
+				// TODO For some tests this could be a problem when querying too early if all resources were released
+				instance.getTaskManager().tell(new TaskManagerMessages.FailIntermediateResultPartitions(attemptId), ActorRef.noSender());
+			}
+			catch (Throwable t) {
+				fail(new Exception("Intermediate result partition could not be failed.", t));
+			}
+		}
+	}
+
+	private boolean sendUpdateTaskRpcCall(final AllocatedSlot consumerSlot, final ExecutionAttemptID executionId, final IntermediateDataSetID resultId, final PartitionInfo partitionInfo) throws Exception {
+		final Instance instance = consumerSlot.getInstance();
+
+		final TaskManagerMessages.TaskOperationResult result = AkkaUtils.ask(
+				instance.getTaskManager(), new TaskManagerMessages.UpdateTask(executionId, resultId, partitionInfo), timeout);
+
+		if (!result.success()) {
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Update task {} was unsuccessful (maybe an RPC race): {}", executionId, result.description());
+			}
+
+			return false;
+		}
+
+		return true;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Miscellaneous
 	// --------------------------------------------------------------------------------------------
-	
+
 	private boolean transitionState(ExecutionState currentState, ExecutionState targetState) {
 		return transitionState(currentState, targetState, null);
 	}
-	
+
 	private boolean transitionState(ExecutionState currentState, ExecutionState targetState, Throwable error) {
 		if (STATE_UPDATER.compareAndSet(this, currentState, targetState)) {
 			markTimestamp(targetState);
-			
+
 			// make sure that the state transition completes normally.
 			// potential errors (in listeners may not affect the main logic)
 			try {
 				vertex.notifyStateTransition(attemptId, targetState, error);
 			}
 			catch (Throwable t) {
-				LOG.error("Error while notifying execution graph of execution state trnsition.", t);
+				LOG.error("Error while notifying execution graph of execution state transition.", t);
 			}
 			return true;
 		} else {
 			return false;
 		}
 	}
-	
+
 	private void markTimestamp(ExecutionState state) {
 		markTimestamp(state, System.currentTimeMillis());
 	}
-	
+
 	private void markTimestamp(ExecutionState state, long timestamp) {
 		this.stateTimestamps[state.ordinal()] = timestamp;
 	}
-	
+
 	public String getVertexWithAttempt() {
 		return vertex.getSimpleName() + " - execution #" + attemptNumber;
 	}
-	
+
 	@Override
 	public String toString() {
 		return String.format("Attempt #%d (%s) @ %s - [%s]", attemptNumber, vertex.getSimpleName(),

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAttemptID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAttemptID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAttemptID.java
index 6186a97..f5662f3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAttemptID.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAttemptID.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.executiongraph;
 
+import io.netty.buffer.ByteBuf;
 import org.apache.flink.runtime.AbstractID;
 
 /**
@@ -25,6 +26,19 @@ import org.apache.flink.runtime.AbstractID;
  * in cases of failures and recovery.
  */
 public class ExecutionAttemptID extends AbstractID {
-	
+
 	private static final long serialVersionUID = -1169683445778281344L;
+
+	public ExecutionAttemptID() {
+	}
+
+	public ExecutionAttemptID(long lowerPart, long upperPart) {
+		super(lowerPart, upperPart);
+	}
+
+	public static ExecutionAttemptID fromByteBuf(ByteBuf buf) {
+		long lower = buf.readLong();
+		long upper = buf.readLong();
+		return new ExecutionAttemptID(lower, upper);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionEdge.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionEdge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionEdge.java
index 92ca394..0d81fde 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionEdge.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionEdge.java
@@ -18,53 +18,29 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import org.apache.flink.runtime.io.network.channels.ChannelID;
-
 public class ExecutionEdge {
 
 	private final IntermediateResultPartition source;
-	
+
 	private final ExecutionVertex target;
-	
+
 	private final int inputNum;
 
-	private ChannelID inputChannelId;
-	
-	private ChannelID outputChannelId;
-	
-	
 	public ExecutionEdge(IntermediateResultPartition source, ExecutionVertex target, int inputNum) {
 		this.source = source;
 		this.target = target;
 		this.inputNum = inputNum;
-		
-		this.inputChannelId = new ChannelID();
-		this.outputChannelId = new ChannelID();
 	}
-	
-	
+
 	public IntermediateResultPartition getSource() {
 		return source;
 	}
-	
+
 	public ExecutionVertex getTarget() {
 		return target;
 	}
-	
+
 	public int getInputNum() {
 		return inputNum;
 	}
-	
-	public ChannelID getInputChannelId() {
-		return inputChannelId;
-	}
-	
-	public ChannelID getOutputChannelId() {
-		return outputChannelId;
-	}
-	
-	public void assignNewChannelIDs() {
-		inputChannelId = new ChannelID();
-		outputChannelId = new ChannelID();
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index dd9fdb1..2de74f8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -18,33 +18,12 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import java.io.Serializable;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-
 import akka.actor.ActorRef;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
-import org.apache.flink.runtime.io.network.ConnectionInfoLookupResponse;
-import org.apache.flink.runtime.io.network.RemoteReceiver;
-import org.apache.flink.runtime.io.network.channels.ChannelID;
 import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobID;
@@ -53,23 +32,36 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.messages.ExecutionGraphMessages;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
-import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged;
 import org.apache.flink.util.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import static akka.dispatch.Futures.future;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
+import static akka.dispatch.Futures.future;
 
 public class ExecutionGraph implements Serializable {
+
 	static final long serialVersionUID = 42L;
 
 	private static final AtomicReferenceFieldUpdater<ExecutionGraph, JobStatus> STATE_UPDATER =
 			AtomicReferenceFieldUpdater.newUpdater(ExecutionGraph.class, JobStatus.class, "state");
-	
+
 	/** The log object used for debugging. */
 	static final Logger LOG = LoggerFactory.getLogger(ExecutionGraph.class);
 
 	// --------------------------------------------------------------------------------------------
-	
+
 	/** The ID of the job this graph has been built for. */
 	private final JobID jobID;
 
@@ -78,67 +70,61 @@ public class ExecutionGraph implements Serializable {
 
 	/** The job configuration that was originally attached to the JobGraph. */
 	private transient final Configuration jobConfiguration;
-	
+
 	/** The classloader of the user code. */
 	private final ClassLoader userClassLoader;
-	
+
 	/** All job vertices that are part of this graph */
 	private final ConcurrentHashMap<JobVertexID, ExecutionJobVertex> tasks;
-	
+
 	/** All vertices, in the order in which they were created **/
 	private final List<ExecutionJobVertex> verticesInCreationOrder;
 
 	/** All intermediate results that are part of this graph */
-	private transient final ConcurrentHashMap<IntermediateDataSetID, IntermediateResult>
-	intermediateResults;
-	
+	private transient final ConcurrentHashMap<IntermediateDataSetID, IntermediateResult> intermediateResults;
+
 	/** The currently executed tasks, for callbacks */
 	private transient final ConcurrentHashMap<ExecutionAttemptID, Execution> currentExecutions;
 
-	private transient final Map<ChannelID, ExecutionEdge> edges = new HashMap<ChannelID,
-			ExecutionEdge>();
-	
 	private transient final List<BlobKey> requiredJarFiles;
-	
+
 	private transient final List<ActorRef> jobStatusListenerActors;
 
 	private transient final List<ActorRef> executionListenerActors;
-	
+
 	private final long[] stateTimestamps;
-	
+
 	private transient final Object progressLock = new Object();
-	
+
 	private int nextVertexToFinish;
-	
+
 	private int numberOfRetriesLeft;
-	
+
 	private long delayBeforeRetrying;
-	
+
 	private volatile JobStatus state = JobStatus.CREATED;
-	
+
 	private volatile Throwable failureCause;
-	
-	
+
 	private transient Scheduler scheduler;
-	
+
 	private boolean allowQueuedScheduling = true;
-	
-	
+
 	public ExecutionGraph(JobID jobId, String jobName, Configuration jobConfig) {
 		this(jobId, jobName, jobConfig, new ArrayList<BlobKey>());
 	}
-	
-	public ExecutionGraph(JobID jobId, String jobName, Configuration jobConfig,
-						List<BlobKey> requiredJarFiles) {
+
+	public ExecutionGraph(JobID jobId, String jobName, Configuration jobConfig, List<BlobKey> requiredJarFiles) {
 		this(jobId, jobName, jobConfig, requiredJarFiles, Thread.currentThread().getContextClassLoader());
 	}
-	
+
 	public ExecutionGraph(JobID jobId, String jobName, Configuration jobConfig, 
 			List<BlobKey> requiredJarFiles, ClassLoader userClassLoader) {
+
 		if (jobId == null || jobName == null || jobConfig == null || userClassLoader == null) {
 			throw new NullPointerException();
 		}
-		
+
 		this.jobID = jobId;
 		this.jobName = jobName;
 		this.jobConfiguration = jobConfig;
@@ -148,10 +134,10 @@ public class ExecutionGraph implements Serializable {
 		this.intermediateResults = new ConcurrentHashMap<IntermediateDataSetID, IntermediateResult>();
 		this.verticesInCreationOrder = new ArrayList<ExecutionJobVertex>();
 		this.currentExecutions = new ConcurrentHashMap<ExecutionAttemptID, Execution>();
-		
+
 		this.jobStatusListenerActors  = new CopyOnWriteArrayList<ActorRef>();
 		this.executionListenerActors = new CopyOnWriteArrayList<ActorRef>();
-		
+
 		this.stateTimestamps = new long[JobStatus.values().length];
 		this.stateTimestamps[JobStatus.CREATED.ordinal()] = System.currentTimeMillis();
 
@@ -159,29 +145,29 @@ public class ExecutionGraph implements Serializable {
 	}
 
 	// --------------------------------------------------------------------------------------------
-	
+
 	public void setNumberOfRetriesLeft(int numberOfRetriesLeft) {
 		if (numberOfRetriesLeft < -1) {
 			throw new IllegalArgumentException();
 		}
 		this.numberOfRetriesLeft = numberOfRetriesLeft;
 	}
-	
+
 	public int getNumberOfRetriesLeft() {
 		return numberOfRetriesLeft;
 	}
-	
+
 	public void setDelayBeforeRetrying(long delayBeforeRetrying) {
 		if (delayBeforeRetrying < 0) {
 			throw new IllegalArgumentException("Delay before retry must be non-negative.");
 		}
 		this.delayBeforeRetrying = delayBeforeRetrying;
 	}
-	
+
 	public long getDelayBeforeRetrying() {
 		return delayBeforeRetrying;
 	}
-	
+
 	public void attachJobGraph(List<AbstractJobVertex> topologiallySorted) throws JobException {
 		if (LOG.isDebugEnabled()) {
 			LOG.debug(String.format("Attaching %d topologically sorted vertices to existing job graph with %d "
@@ -223,39 +209,43 @@ public class ExecutionGraph implements Serializable {
 	}
 
 	// --------------------------------------------------------------------------------------------
-	
+
+	public Scheduler getScheduler() {
+		return scheduler;
+	}
+
 	public JobID getJobID() {
 		return jobID;
 	}
-	
+
 	public String getJobName() {
 		return jobName;
 	}
-	
+
 	public Configuration getJobConfiguration() {
 		return jobConfiguration;
 	}
-	
+
 	public ClassLoader getUserClassLoader() {
 		return this.userClassLoader;
 	}
-	
+
 	public JobStatus getState() {
 		return state;
 	}
-	
+
 	public Throwable getFailureCause() {
 		return failureCause;
 	}
-	
+
 	public ExecutionJobVertex getJobVertex(JobVertexID id) {
 		return this.tasks.get(id);
 	}
-	
+
 	public Map<JobVertexID, ExecutionJobVertex> getAllVertices() {
 		return Collections.unmodifiableMap(this.tasks);
 	}
-	
+
 	public Iterable<ExecutionJobVertex> getVerticesTopologically() {
 		// we return a specific iterator that does not fail with concurrent modifications
 		// the list is append only, so it is safe for that
@@ -289,11 +279,11 @@ public class ExecutionGraph implements Serializable {
 			}
 		};
 	}
-	
+
 	public Map<IntermediateDataSetID, IntermediateResult> getAllIntermediateResults() {
 		return Collections.unmodifiableMap(this.intermediateResults);
 	}
-	
+
 	public Iterable<ExecutionVertex> getAllExecutionVertices() {
 		return new Iterable<ExecutionVertex>() {
 			@Override
@@ -302,28 +292,30 @@ public class ExecutionGraph implements Serializable {
 			}
 		};
 	}
-	
+
 	public long getStatusTimestamp(JobStatus status) {
 		return this.stateTimestamps[status.ordinal()];
 	}
-	
+
 	public boolean isQueuedSchedulingAllowed() {
 		return this.allowQueuedScheduling;
 	}
-	
+
 	public void setQueuedSchedulingAllowed(boolean allowed) {
 		this.allowQueuedScheduling = allowed;
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+	//  Actions
+	// --------------------------------------------------------------------------------------------
+
 	public void scheduleForExecution(Scheduler scheduler) throws JobException {
 		if (scheduler == null) {
 			throw new IllegalArgumentException("Scheduler must not be null.");
 		}
 		
 		if (this.scheduler != null && this.scheduler != scheduler) {
-			throw new IllegalArgumentException("Cann not use different schedulers for the same job");
+			throw new IllegalArgumentException("Cannot use different schedulers for the same job");
 		}
 		
 		if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {
@@ -342,7 +334,7 @@ public class ExecutionGraph implements Serializable {
 			throw new IllegalStateException("Job may only be scheduled from state " + JobStatus.CREATED);
 		}
 	}
-	
+
 	public void cancel() {
 		while (true) {
 			JobStatus current = state;
@@ -361,7 +353,7 @@ public class ExecutionGraph implements Serializable {
 			}
 		}
 	}
-	
+
 	public void fail(Throwable t) {
 		while (true) {
 			JobStatus current = state;
@@ -383,11 +375,10 @@ public class ExecutionGraph implements Serializable {
 		}
 	}
 
-
 	private boolean transitionState(JobStatus current, JobStatus newState) {
 		return transitionState(current, newState, null);
 	}
-	
+
 	private boolean transitionState(JobStatus current, JobStatus newState, Throwable error) {
 		if (STATE_UPDATER.compareAndSet(this, current, newState)) {
 			stateTimestamps[newState.ordinal()] = System.currentTimeMillis();
@@ -398,7 +389,7 @@ public class ExecutionGraph implements Serializable {
 			return false;
 		}
 	}
-	
+
 	void jobVertexInFinalState(ExecutionJobVertex ev) {
 		synchronized (progressLock) {
 			int nextPos = nextVertexToFinish;
@@ -457,18 +448,18 @@ public class ExecutionGraph implements Serializable {
 							fail(new Exception("ExecutionGraph went into final state from state " + current));
 						}
 					}
-					
+
 					// also, notify waiters
 					progressLock.notifyAll();
 				}
 			}
 		}
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	//  Callbacks and Callback Utilities
 	// --------------------------------------------------------------------------------------------
-	
+
 	public boolean updateState(TaskExecutionState state) {
 		Execution attempt = this.currentExecutions.get(state.getID());
 		if (attempt != null) {
@@ -493,181 +484,63 @@ public class ExecutionGraph implements Serializable {
 			return false;
 		}
 	}
-	
-	public ConnectionInfoLookupResponse lookupConnectionInfoAndDeployReceivers(InstanceConnectionInfo caller, ChannelID sourceChannelID) {
-		
-		final ExecutionEdge edge = edges.get(sourceChannelID);
-		if (edge == null) {
-			// that is bad, we need to fail the job
-			LOG.error("Cannot find execution edge associated with ID " + sourceChannelID);
-			fail(new Exception("Channels are not correctly registered"));
-			return ConnectionInfoLookupResponse.createReceiverNotFound();
-		}
-		
-		
-		//  ----- Request was sent from an input channel (receiver side), requesting the output channel (sender side) ------
-		//  -----                               This is the case for backwards events                                 ------
-
-		if (sourceChannelID.equals(edge.getInputChannelId())) {
-			final ExecutionVertex targetVertex = edge.getSource().getProducer();
-			final ExecutionState executionState = targetVertex.getExecutionState();
-			
-			// common case - found task running
-			if (executionState == ExecutionState.RUNNING) {
-				Instance location = targetVertex.getCurrentAssignedResource().getInstance();
 
-				if (location.getInstanceConnectionInfo().equals(caller)) {
-					// Receiver runs on the same task manager
-					return ConnectionInfoLookupResponse.createReceiverFoundAndReady(edge.getOutputChannelId());
-				}
-				else {
-					// Receiver runs on a different task manager
-					final InstanceConnectionInfo ici = location.getInstanceConnectionInfo();
-					final InetSocketAddress isa = new InetSocketAddress(ici.address(), ici.dataPort());
+	public boolean scheduleOrUpdateConsumers(ExecutionAttemptID executionId, int partitionIndex) throws Exception {
+		Execution execution = currentExecutions.get(executionId);
 
-					int connectionIdx = edge.getSource().getIntermediateResult().getConnectionIndex();
-					return ConnectionInfoLookupResponse.createReceiverFoundAndReady(new RemoteReceiver(isa, connectionIdx));
-				}
-			}
-			else if (executionState == ExecutionState.FINISHED) {
-				// that should not happen. if there is data pending, the sender cannot yet be done
-				// we need to fail the whole affair
-				LOG.error("Receiver " + targetVertex + " set to FINISHED even though data is pending");
-				fail(new Exception("Channels are not correctly registered"));
-				return ConnectionInfoLookupResponse.createReceiverNotFound();
-			}
-			else if (executionState == ExecutionState.FAILED || executionState == ExecutionState.CANCELED ||
-					executionState == ExecutionState.CANCELING)
-			{
-				return ConnectionInfoLookupResponse.createJobIsAborting();
-			}
-			else {
-				// all other states should not be, because the sender cannot be in CREATED, SCHEDULED, or DEPLOYING
-				// state when the receiver is already running
-				LOG.error("Channel lookup (backwards) - sender " + targetVertex + " found in inconsistent state " + executionState);
-				fail(new Exception("Channels are not correctly registered"));
-				return ConnectionInfoLookupResponse.createReceiverNotFound();
-			}
+		if (execution == null) {
+			throw new IllegalStateException("Cannot find execution for execution ID " + executionId);
 		}
-		
-		//  ----- Request was sent from an output channel (sender side), requesting the input channel (receiver side) ------
-		//  -----                                 This is the case for forward data                                   ------
-		
-		final ExecutionVertex targetVertex = edge.getTarget();
-		final ExecutionState executionState = targetVertex.getExecutionState();
 
-		if (executionState == ExecutionState.RUNNING) {
-			
-			// already online
-			Instance location = targetVertex.getCurrentAssignedResource().getInstance();
-			
-			if (location.getInstanceConnectionInfo().equals(caller)) {
-				// Receiver runs on the same task manager
-				return ConnectionInfoLookupResponse.createReceiverFoundAndReady(edge.getInputChannelId());
-			}
-			else {
-				// Receiver runs on a different task manager
-				final InstanceConnectionInfo ici = location.getInstanceConnectionInfo();
-				final InetSocketAddress isa = new InetSocketAddress(ici.address(), ici.dataPort());
-
-				final int connectionIdx = edge.getSource().getIntermediateResult().getConnectionIndex();
-				return ConnectionInfoLookupResponse.createReceiverFoundAndReady(new RemoteReceiver(isa, connectionIdx));
-			}
-		}
-		else if (executionState == ExecutionState.DEPLOYING || executionState == ExecutionState.SCHEDULED) {
-			return ConnectionInfoLookupResponse.createReceiverNotReady();
-		}
-		else if (executionState == ExecutionState.CREATED) {
-			// bring the receiver online
-			try {
-				edge.getTarget().scheduleForExecution(scheduler, false);
-				
-				// delay the requester
-				return ConnectionInfoLookupResponse.createReceiverNotReady();
-			}
-			catch (JobException e) {
-				fail(new Exception("Cannot schedule the receivers, not enough resources", e));
-				return ConnectionInfoLookupResponse.createJobIsAborting();
-			}
-		}
-		else if (executionState == ExecutionState.CANCELED || executionState == ExecutionState.CANCELING ||
-				executionState == ExecutionState.FAILED)
-		{
-			return ConnectionInfoLookupResponse.createJobIsAborting();
-		}
-		else {
-			// illegal state for all other states - or all the other state, since the only remaining state is FINISHED
-			// state when the receiver is already running
-			String message = "Channel lookup (forward) - receiver " + targetVertex + " found in inconsistent state " + executionState;
-			LOG.error(message);
-			fail(new Exception(message));
-			return ConnectionInfoLookupResponse.createReceiverNotFound();
-		}
+		return execution.getVertex().scheduleOrUpdateConsumers(partitionIndex);
 	}
-	
+
 	public Map<ExecutionAttemptID, Execution> getRegisteredExecutions() {
 		return Collections.unmodifiableMap(currentExecutions);
 	}
-	
+
 	void registerExecution(Execution exec) {
 		Execution previous = currentExecutions.putIfAbsent(exec.getAttemptId(), exec);
 		if (previous != null) {
 			fail(new Exception("Trying to register execution " + exec + " for already used ID " + exec.getAttemptId()));
 		}
 	}
-	
+
 	void deregisterExecution(Execution exec) {
 		Execution contained = currentExecutions.remove(exec.getAttemptId());
-		
+
 		if (contained != null && contained != exec) {
 			fail(new Exception("De-registering execution " + exec + " failed. Found for same ID execution " + contained));
 		}
 	}
-	
-	void registerExecutionEdge(ExecutionEdge edge) {
-		ChannelID target = edge.getInputChannelId();
-		ChannelID source = edge.getOutputChannelId();
-		edges.put(source, edge);
-		edges.put(target, edge);
-	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	//  Listeners & Observers
 	// --------------------------------------------------------------------------------------------
-	
+
 	public void registerJobStatusListener(ActorRef listener){
 		this.jobStatusListenerActors.add(listener);
-
 	}
 
 	public void registerExecutionListener(ActorRef listener){
 		this.executionListenerActors.add(listener);
 	}
-	
+
 	/**
 	 * NOTE: This method never throws an error, only logs errors caused by the notified listeners.
-	 * 
-	 * @param newState
-	 * @param error
 	 */
 	private void notifyJobStatusChange(JobStatus newState, Throwable error) {
 		if(jobStatusListenerActors.size() > 0){
 			String message = error == null ? null : ExceptionUtils.stringifyException(error);
 			for(ActorRef listener: jobStatusListenerActors){
-				listener.tell(new JobStatusChanged(jobID, newState, System.currentTimeMillis(),
+				listener.tell(new ExecutionGraphMessages.JobStatusChanged(jobID, newState, System.currentTimeMillis(),
 								message), ActorRef.noSender());
 			}
 		}
-
 	}
-	
+
 	/**
 	 * NOTE: This method never throws an error, only logs errors caused by the notified listeners.
-	 * 
-	 * @param vertexId
-	 * @param subtask
-	 * @param newExecutionState
-	 * @param error
 	 */
 	void notifyExecutionChange(JobVertexID vertexId, int subtask, ExecutionAttemptID executionID, ExecutionState
 							newExecutionState, Throwable error) {
@@ -703,7 +576,6 @@ public class ExecutionGraph implements Serializable {
 				}
 
 				this.currentExecutions.clear();
-				this.edges.clear();
 
 				for (ExecutionJobVertex jv : this.verticesInCreationOrder) {
 					jv.resetForNewExecution();
@@ -717,7 +589,8 @@ public class ExecutionGraph implements Serializable {
 			}
 
 			scheduleForExecution(scheduler);
-		} catch (Throwable t) {
+		}
+		catch (Throwable t) {
 			fail(t);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 0cf944d..816ba1b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -18,38 +18,43 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import static org.apache.flink.runtime.execution.ExecutionState.CANCELED;
-import static org.apache.flink.runtime.execution.ExecutionState.FAILED;
-import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-import org.apache.flink.runtime.blob.BlobKey;
-import org.slf4j.Logger;
 import org.apache.flink.runtime.JobException;
-import org.apache.flink.runtime.deployment.GateDeploymentDescriptor;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.deployment.PartitionConsumerDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.PartitionDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.PartitionInfo;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobEdge;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.slf4j.Logger;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import static com.google.common.base.Preconditions.checkElementIndex;
+import static org.apache.flink.runtime.execution.ExecutionState.CANCELED;
+import static org.apache.flink.runtime.execution.ExecutionState.FAILED;
+import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
 
 /**
  * The ExecutionVertex is a parallel subtask of the execution. It may be executed once, or several times, each of
  * which time it spawns an {@link Execution}.
  */
 public class ExecutionVertex implements Serializable {
+
 	static final long serialVersionUID = 42L;
 
 	@SuppressWarnings("unused")
@@ -78,28 +83,29 @@ public class ExecutionVertex implements Serializable {
 	public ExecutionVertex(ExecutionJobVertex jobVertex, int subTaskIndex, IntermediateResult[] producedDataSets) {
 		this(jobVertex, subTaskIndex, producedDataSets, System.currentTimeMillis());
 	}
-	
+
 	public ExecutionVertex(ExecutionJobVertex jobVertex, int subTaskIndex, IntermediateResult[] producedDataSets, long createTimestamp) {
 		this.jobVertex = jobVertex;
 		this.subTaskIndex = subTaskIndex;
-		
+
 		this.resultPartitions = new IntermediateResultPartition[producedDataSets.length];
 		for (int i = 0; i < producedDataSets.length; i++) {
 			IntermediateResultPartition irp = new IntermediateResultPartition(producedDataSets[i], this, subTaskIndex);
 			this.resultPartitions[i] = irp;
 			producedDataSets[i].setPartition(subTaskIndex, irp);
 		}
-		
+
 		this.inputEdges = new ExecutionEdge[jobVertex.getJobVertex().getInputs().size()][];
 		this.priorExecutions = new CopyOnWriteArrayList<Execution>();
-		
+
 		this.currentExecution = new Execution(this, 0, createTimestamp);
-		
+
 		// create a co-location scheduling hint, if necessary
 		CoLocationGroup clg = jobVertex.getCoLocationGroup();
 		if (clg != null) {
 			this.locationConstraint = clg.getLocationConstraint(subTaskIndex);
-		} else {
+		}
+		else {
 			this.locationConstraint = null;
 		}
 	}
@@ -181,14 +187,14 @@ public class ExecutionVertex implements Serializable {
 		final DistributionPattern pattern = edge.getDistributionPattern();
 		final IntermediateResultPartition[] sourcePartitions = source.getPartitions();
 		
-		ExecutionEdge[] edges = null;
+		ExecutionEdge[] edges;
 		
 		switch (pattern) {
 			case POINTWISE:
 				edges = connectPointwise(sourcePartitions, inputNumber);
 				break;
-				
-			case BIPARTITE: 
+
+			case ALL_TO_ALL:
 				edges = connectAllToAll(sourcePartitions, inputNumber);
 				break;
 				
@@ -199,14 +205,11 @@ public class ExecutionVertex implements Serializable {
 		
 		this.inputEdges[inputNumber] = edges;
 		
-		ExecutionGraph graph = getExecutionGraph();
-		
 		// add the consumers to the source
 		// for now (until the receiver initiated handshake is in place), we need to register the 
 		// edges as the execution graph
 		for (ExecutionEdge ee : edges) {
 			ee.getSource().addConsumer(ee, consumerNumber);
-			graph.registerExecutionEdge(ee);
 		}
 	}
 	
@@ -224,15 +227,15 @@ public class ExecutionVertex implements Serializable {
 	private ExecutionEdge[] connectPointwise(IntermediateResultPartition[] sourcePartitions, int inputNumber) {
 		final int numSources = sourcePartitions.length;
 		final int parallelism = getTotalNumberOfParallelSubtasks();
-		
+	
 		// simple case same number of sources as targets
 		if (numSources == parallelism) {
 			return new ExecutionEdge[] { new ExecutionEdge(sourcePartitions[subTaskIndex], this, inputNumber) };
 		}
 		else if (numSources < parallelism) {
-			
+
 			int sourcePartition;
-			
+
 			// check if the pattern is regular or irregular
 			// we use int arithmetics for regular, and floating point with rounding for irregular
 			if (parallelism % numSources == 0) {
@@ -245,7 +248,7 @@ public class ExecutionVertex implements Serializable {
 				float factor = ((float) parallelism) / numSources;
 				sourcePartition = (int) (subTaskIndex / factor);
 			}
-			
+
 			return new ExecutionEdge[] { new ExecutionEdge(sourcePartitions[sourcePartition], this, inputNumber) };
 		}
 		else {
@@ -253,7 +256,7 @@ public class ExecutionVertex implements Serializable {
 				// same number of targets per source
 				int factor = numSources / parallelism;
 				int startIndex = subTaskIndex * factor;
-				
+
 				ExecutionEdge[] edges = new ExecutionEdge[factor];
 				for (int i = 0; i < factor; i++) {
 					edges[i] = new ExecutionEdge(sourcePartitions[startIndex + i], this, inputNumber);
@@ -262,17 +265,17 @@ public class ExecutionVertex implements Serializable {
 			}
 			else {
 				float factor = ((float) numSources) / parallelism;
-				
+
 				int start = (int) (subTaskIndex * factor);
 				int end = (subTaskIndex == getTotalNumberOfParallelSubtasks() - 1) ?
 						sourcePartitions.length : 
 						(int) ((subTaskIndex + 1) * factor);
-				
+
 				ExecutionEdge[] edges = new ExecutionEdge[end - start];
 				for (int i = 0; i < edges.length; i++) {
 					edges[i] = new ExecutionEdge(sourcePartitions[start + i], this, inputNumber);
 				}
-				
+
 				return edges;
 			}
 		}
@@ -305,16 +308,16 @@ public class ExecutionVertex implements Serializable {
 		}
 		return locations;
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	//   Actions
 	// --------------------------------------------------------------------------------------------
-	
+
 	public void resetForNewExecution() {
 		synchronized (priorExecutions) {
 			Execution execution = currentExecution;
 			ExecutionState state = execution.getState();
-			
+
 			if (state == FINISHED || state == CANCELED || state == FAILED) {
 				priorExecutions.add(execution);
 				currentExecution = new Execution(this, execution.getAttemptNumber()+1, System.currentTimeMillis());
@@ -323,110 +326,118 @@ public class ExecutionVertex implements Serializable {
 				if (grp != null) {
 					this.locationConstraint = grp.getLocationConstraint(subTaskIndex);
 				}
-				
-				// temp: assign new channel IDs.
-				ExecutionGraph graph = getExecutionGraph();
-				
-				for (ExecutionEdge[] input : this.inputEdges) {
-					for (ExecutionEdge e : input) {
-						e.assignNewChannelIDs();
-						graph.registerExecutionEdge(e);
-					}
-				}
 			}
 			else {
 				throw new IllegalStateException("Cannot reset a vertex that is in state " + state);
 			}
 		}
 	}
-	
-	public void scheduleForExecution(Scheduler scheduler, boolean queued) throws NoResourceAvailableException {
-		this.currentExecution.scheduleForExecution(scheduler, queued);
+
+	public boolean scheduleForExecution(Scheduler scheduler, boolean queued) throws NoResourceAvailableException {
+		return this.currentExecution.scheduleForExecution(scheduler, queued);
 	}
-	
+
 	public void deployToSlot(AllocatedSlot slot) throws JobException {
 		this.currentExecution.deployToSlot(slot);
 	}
-	
+
 	public void cancel() {
 		this.currentExecution.cancel();
 	}
-	
+
 	public void fail(Throwable t) {
 		this.currentExecution.fail(t);
 	}
-	
+
+	/**
+	 * Schedules or updates the {@link IntermediateResultPartition} consumer
+	 * tasks of the intermediate result partition with the given index.
+	 */
+	boolean scheduleOrUpdateConsumers(int partitionIndex) throws Exception {
+		checkElementIndex(partitionIndex, resultPartitions.length);
+
+		IntermediateResultPartition partition = resultPartitions[partitionIndex];
+
+		return currentExecution.scheduleOrUpdateConsumers(partition.getConsumers());
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//   Notifications from the Execution Attempt
 	// --------------------------------------------------------------------------------------------
-	
+
 	void executionFinished() {
 		jobVertex.vertexFinished(subTaskIndex);
 	}
-	
+
 	void executionCanceled() {
 		jobVertex.vertexCancelled(subTaskIndex);
 	}
-	
+
 	void executionFailed(Throwable t) {
 		jobVertex.vertexFailed(subTaskIndex, t);
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	//   Miscellaneous
 	// --------------------------------------------------------------------------------------------
-	
+
 	/**
 	 * Simply forward this notification. This is for logs and event archivers.
-	 * 
-	 * @param executionId
-	 * @param newState
-	 * @param error
 	 */
 	void notifyStateTransition(ExecutionAttemptID executionId, ExecutionState newState, Throwable error) {
 		getExecutionGraph().notifyExecutionChange(getJobvertexId(), subTaskIndex, executionId, newState, error);
 	}
 	
 	TaskDeploymentDescriptor createDeploymentDescriptor(ExecutionAttemptID executionId, AllocatedSlot slot) {
-		//  create the input gate deployment descriptors
-		List<GateDeploymentDescriptor> inputGates = new ArrayList<GateDeploymentDescriptor>(inputEdges.length);
-		for (ExecutionEdge[] channels : inputEdges) {
-			inputGates.add(GateDeploymentDescriptor.fromEdges(channels));
-		}
-		
-		// create the output gate deployment descriptors
-		List<GateDeploymentDescriptor> outputGates = new ArrayList<GateDeploymentDescriptor>(resultPartitions.length);
+		// Produced intermediate results
+		List<PartitionDeploymentDescriptor> producedPartitions = new ArrayList<PartitionDeploymentDescriptor>(resultPartitions.length);
+
 		for (IntermediateResultPartition partition : resultPartitions) {
-			for (List<ExecutionEdge> channels : partition.getConsumers()) {
-				outputGates.add(GateDeploymentDescriptor.fromEdges(channels));
-			}
+			producedPartitions.add(PartitionDeploymentDescriptor.fromIntermediateResultPartition(partition));
 		}
-		
+
+		// Consumed intermediate results
+		List<PartitionConsumerDeploymentDescriptor> consumedPartitions = new ArrayList<PartitionConsumerDeploymentDescriptor>();
+
+		for (ExecutionEdge[] edges : inputEdges) {
+			PartitionInfo[] partitions = PartitionInfo.fromEdges(edges, slot);
+
+			// If the produced partition has multiple consumers registered, we
+			// need to request the one matching our sub task index.
+			// TODO Refactor after removing the consumers from the intermediate result partitions
+			int numConsumerEdges = edges[0].getSource().getConsumers().get(0).size();
+
+			int queueToRequest = subTaskIndex % numConsumerEdges;
+
+			IntermediateDataSetID resultId = edges[0].getSource().getIntermediateResult().getId();
+
+			consumedPartitions.add(new PartitionConsumerDeploymentDescriptor(resultId, partitions, queueToRequest));
+		}
+
 		List<BlobKey> jarFiles = getExecutionGraph().getRequiredJarFiles();
-		
-		return new TaskDeploymentDescriptor(getJobId(), getJobvertexId(), executionId, getTaskName(), 
-				subTaskIndex, getTotalNumberOfParallelSubtasks(), 
-				getExecutionGraph().getJobConfiguration(), jobVertex.getJobVertex().getConfiguration(),
-				jobVertex.getJobVertex().getInvokableClassName(), outputGates, inputGates, jarFiles, slot.getSlotNumber());
+
+		return new TaskDeploymentDescriptor(getJobId(), getJobvertexId(), executionId, getTaskName(),
+				subTaskIndex, getTotalNumberOfParallelSubtasks(), getExecutionGraph().getJobConfiguration(),
+				jobVertex.getJobVertex().getConfiguration(), jobVertex.getJobVertex().getInvokableClassName(),
+				producedPartitions, consumedPartitions, jarFiles, slot.getSlotNumber());
 	}
-	
-	
+
 	// --------------------------------------------------------------------------------------------
 	//  Utilities
 	// --------------------------------------------------------------------------------------------
-	
+
 	/**
 	 * Creates a simple name representation in the style 'taskname (x/y)', where
 	 * 'taskname' is the name as returned by {@link #getTaskName()}, 'x' is the parallel
 	 * subtask index as returned by {@link #getParallelSubtaskIndex()}{@code + 1}, and 'y' is the total
 	 * number of tasks, as returned by {@link #getTotalNumberOfParallelSubtasks()}.
-	 * 
+	 *
 	 * @return A simple name representation.
 	 */
 	public String getSimpleName() {
 		return getTaskName() + " (" + (getParallelSubtaskIndex()+1) + '/' + getTotalNumberOfParallelSubtasks() + ')';
 	}
-	
+
 	@Override
 	public String toString() {
 		return getSimpleName();

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
index a5c0613..64ad2d2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
@@ -19,72 +19,80 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionType;
 
 public class IntermediateResult {
 
 	private final IntermediateDataSetID id;
-	
+
 	private final ExecutionJobVertex producer;
-	
+
 	private final IntermediateResultPartition[] partitions;
-	
+
 	private final int numParallelProducers;
-	
+
 	private int partitionsAssigned;
-	
+
 	private int numConsumers;
-	
+
 	private final int connectionIndex;
-	
-	
+
+	private final IntermediateResultPartitionType resultType;
+
 	public IntermediateResult(IntermediateDataSetID id, ExecutionJobVertex producer, int numParallelProducers) {
 		this.id = id;
 		this.producer = producer;
 		this.partitions = new IntermediateResultPartition[numParallelProducers];
 		this.numParallelProducers = numParallelProducers;
-		
+
 		// we do not set the intermediate result partitions here, because we let them be initialized by
 		// the execution vertex that produces them
-		
+
 		// assign a random connection index
 		this.connectionIndex = (int) (Math.random() * Integer.MAX_VALUE);
+
+		// The runtime type for this produced result
+		// TODO The JobGraph generator has to decide which type of result this is
+		this.resultType = IntermediateResultPartitionType.PIPELINED;
 	}
-	
+
 	public void setPartition(int partitionNumber, IntermediateResultPartition partition) {
 		if (partition == null || partitionNumber < 0 || partitionNumber >= numParallelProducers) {
 			throw new IllegalArgumentException();
 		}
-		
+
 		if (partitions[partitionNumber] != null) {
 			throw new IllegalStateException("Partition #" + partitionNumber + " has already been assigned.");
 		}
-		
+
 		partitions[partitionNumber] = partition;
 		partitionsAssigned++;
 	}
-	
-	
-	
+
 	public IntermediateDataSetID getId() {
 		return id;
 	}
-	
+
 	public ExecutionJobVertex getProducer() {
 		return producer;
 	}
-	
+
 	public IntermediateResultPartition[] getPartitions() {
 		return partitions;
 	}
-	
+
 	public int getNumberOfAssignedPartitions() {
 		return partitionsAssigned;
 	}
-	
+
+	public IntermediateResultPartitionType getResultType() {
+		return resultType;
+	}
+
 	public int registerConsumer() {
 		final int index = numConsumers;
 		numConsumers++;
-		
+
 		for (IntermediateResultPartition p : partitions) {
 			if (p.addConsumerGroup() != index) {
 				throw new RuntimeException("Inconsistent consumer mapping between intermediate result partitions.");
@@ -92,7 +100,7 @@ public class IntermediateResult {
 		}
 		return index;
 	}
-	
+
 	public int getConnectionIndex() {
 		return connectionIndex;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
index 92ee9ac..7d06dca 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
@@ -18,56 +18,63 @@
 
 package org.apache.flink.runtime.executiongraph;
 
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+
 import java.util.ArrayList;
 import java.util.List;
 
 public class IntermediateResultPartition {
-	
-	private final IntermediateResult totalResut;
-	
+
+	private final IntermediateResult totalResult;
+
 	private final ExecutionVertex producer;
-	
-	private final int partition;
-	
+
+	private final int partitionNumber;
+
+	private final IntermediateResultPartitionID partitionId;
+
 	private List<List<ExecutionEdge>> consumers;
-	
-	
-	public IntermediateResultPartition(IntermediateResult totalResut, ExecutionVertex producer, int partition) {
-		this.totalResut = totalResut;
+
+	public IntermediateResultPartition(IntermediateResult totalResult, ExecutionVertex producer, int partitionNumber) {
+		this.totalResult = totalResult;
 		this.producer = producer;
-		this.partition = partition;
+		this.partitionNumber = partitionNumber;
 		this.consumers = new ArrayList<List<ExecutionEdge>>(0);
+		this.partitionId = new IntermediateResultPartitionID();
 	}
-	
-	
+
 	public ExecutionVertex getProducer() {
 		return producer;
 	}
-	
-	public int getPartition() {
-		return partition;
+
+	public int getPartitionNumber() {
+		return partitionNumber;
 	}
-	
+
 	public IntermediateResult getIntermediateResult() {
-		return totalResut;
+		return totalResult;
+	}
+
+	public IntermediateResultPartitionID getPartitionId() {
+		return partitionId;
 	}
-	
+
 	public List<List<ExecutionEdge>> getConsumers() {
 		return consumers;
 	}
-	
+
 	int addConsumerGroup() {
 		int pos = consumers.size();
-		
+
 		// NOTE: currently we support only one consumer per result!!!
 		if (pos != 0) {
-			throw new RuntimeException("Currenty, each intermediate result can only have one consumer.");
+			throw new RuntimeException("Currently, each intermediate result can only have one consumer.");
 		}
-		
+
 		consumers.add(new ArrayList<ExecutionEdge>());
 		return pos;
 	}
-	
+
 	void addConsumer(ExecutionEdge edge, int consumerNumber) {
 		consumers.get(consumerNumber).add(edge);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockReader.java
index 35273f4..a15acb5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockReader.java
@@ -40,7 +40,7 @@ import org.apache.flink.core.memory.MemorySegment;
  * or even whether the file was written in blocks of the same size, or in blocks at all. Ensuring that the
  * writing and reading is consistent with each other (same blocks sizes) is up to the programmer.  
  */
-public class AsynchronousBlockReader extends AsynchronousFileIOChannel<ReadRequest> implements BlockChannelReader {
+public class AsynchronousBlockReader extends AsynchronousFileIOChannel<MemorySegment, ReadRequest> implements BlockChannelReader {
 	
 	private final LinkedBlockingQueue<MemorySegment> returnSegments;
 	
@@ -72,20 +72,7 @@ public class AsynchronousBlockReader extends AsynchronousFileIOChannel<ReadReque
 	 */
 	@Override
 	public void readBlock(MemorySegment segment) throws IOException {
-		// check the error state of this channel
-		checkErroneous();
-		
-		// write the current buffer and get the next one
-		// the statements have to be in this order to avoid incrementing the counter
-		// after the channel has been closed
-		this.requestsNotReturned.incrementAndGet();
-		if (this.closed || this.requestQueue.isClosed()) {
-			// if we found ourselves closed after the counter increment,
-			// decrement the counter again and do not forward the request
-			this.requestsNotReturned.decrementAndGet();
-			throw new IOException("The reader has been closed.");
-		}
-		this.requestQueue.add(new SegmentReadRequest(this, segment));
+		addRequest(new SegmentReadRequest(this, segment));
 	}
 	
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockWriterWithCallback.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockWriterWithCallback.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockWriterWithCallback.java
index 6b6fb36..c9fbdd2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockWriterWithCallback.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockWriterWithCallback.java
@@ -26,7 +26,7 @@ import org.apache.flink.core.memory.MemorySegment;
  * An asynchronous implementation of the {@link BlockChannelWriterWithCallback} that queues I/O requests
  * and calls a callback once they have been handled.
  */
-public class AsynchronousBlockWriterWithCallback extends AsynchronousFileIOChannel<WriteRequest> implements BlockChannelWriterWithCallback {
+public class AsynchronousBlockWriterWithCallback extends AsynchronousFileIOChannel<MemorySegment, WriteRequest> implements BlockChannelWriterWithCallback {
 	
 	/**
 	 * Creates a new asynchronous block writer for the given channel.
@@ -37,7 +37,7 @@ public class AsynchronousBlockWriterWithCallback extends AsynchronousFileIOChann
 	 * @throws IOException Thrown, if the underlying file channel could not be opened exclusively.
 	 */
 	protected AsynchronousBlockWriterWithCallback(FileIOChannel.ID channelID, RequestQueue<WriteRequest> requestQueue,
-			RequestDoneCallback callback) throws IOException
+			RequestDoneCallback<MemorySegment> callback) throws IOException
 	{
 		super(channelID, requestQueue, callback, true);
 	}
@@ -51,17 +51,6 @@ public class AsynchronousBlockWriterWithCallback extends AsynchronousFileIOChann
 	 */
 	@Override
 	public void writeBlock(MemorySegment segment) throws IOException {
-		// check the error state of this channel
-		checkErroneous();
-		
-		// write the current buffer and get the next one
-		this.requestsNotReturned.incrementAndGet();
-		if (this.closed || this.requestQueue.isClosed()) {
-			// if we found ourselves closed after the counter increment,
-			// decrement the counter again and do not forward the request
-			this.requestsNotReturned.decrementAndGet();
-			throw new IOException("The writer has been closed.");
-		}
-		this.requestQueue.add(new SegmentWriteRequest(this, segment));
+		addRequest(new SegmentWriteRequest(this, segment));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBulkBlockReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBulkBlockReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBulkBlockReader.java
index 048f82f..1abe449 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBulkBlockReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBulkBlockReader.java
@@ -27,7 +27,7 @@ import org.apache.flink.core.memory.MemorySegment;
 /**
  *
  */
-public class AsynchronousBulkBlockReader extends AsynchronousFileIOChannel<ReadRequest> implements BulkBlockChannelReader {
+public class AsynchronousBulkBlockReader extends AsynchronousFileIOChannel<MemorySegment, ReadRequest> implements BulkBlockChannelReader {
 	
 	private final ArrayList<MemorySegment> returnBuffers;
 	
@@ -59,18 +59,7 @@ public class AsynchronousBulkBlockReader extends AsynchronousFileIOChannel<ReadR
 	}
 	
 	private void readBlock(MemorySegment segment) throws IOException {
-		// check the error state of this channel
-		checkErroneous();
-		
-		// write the current buffer and get the next one
-		this.requestsNotReturned.incrementAndGet();
-		if (this.closed || this.requestQueue.isClosed()) {
-			// if we found ourselves closed after the counter increment,
-			// decrement the counter again and do not forward the request
-			this.requestsNotReturned.decrementAndGet();
-			throw new IOException("The reader has been closed.");
-		}
-		this.requestQueue.add(new SegmentReadRequest(this, segment));
+		addRequest(new SegmentReadRequest(this, segment));
 	}
 	
 	@Override
@@ -86,7 +75,7 @@ public class AsynchronousBulkBlockReader extends AsynchronousFileIOChannel<ReadR
 	
 	// --------------------------------------------------------------------------------------------
 	
-	private static final class CollectingCallback implements RequestDoneCallback {
+	private static final class CollectingCallback implements RequestDoneCallback<MemorySegment> {
 		
 		private final ArrayList<MemorySegment> list;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
index 098b334..89ebb25 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
@@ -25,6 +25,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.flink.core.memory.MemorySegment;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 /**
  * A base class for readers and writers that accept read or write requests for whole blocks.
  * The request is delegated to an asynchronous I/O thread. After completion of the I/O request, the memory
@@ -34,7 +36,7 @@ import org.apache.flink.core.memory.MemorySegment;
  * 
  * @param <R> The type of request (e.g. <tt>ReadRequest</tt> or <tt>WriteRequest</tt> issued by this access to the I/O threads.
  */
-public abstract class AsynchronousFileIOChannel<R extends IORequest> extends AbstractFileIOChannel {
+public abstract class AsynchronousFileIOChannel<T, R extends IORequest> extends AbstractFileIOChannel {
 	
 	/** The lock that is used during closing to synchronize the thread that waits for all
 	 * requests to be handled with the asynchronous I/O thread. */
@@ -47,7 +49,7 @@ public abstract class AsynchronousFileIOChannel<R extends IORequest> extends Abs
 	protected final AtomicInteger requestsNotReturned = new AtomicInteger(0);
 	
 	/** Hander for completed requests */
-	protected final RequestDoneCallback resultHander;
+	protected final RequestDoneCallback<T> resultHandler;
 	
 	/** An exception that was encountered by the asynchronous request handling thread.*/
 	protected volatile IOException exception;
@@ -70,16 +72,12 @@ public abstract class AsynchronousFileIOChannel<R extends IORequest> extends Abs
 	 * @throws IOException Thrown, if the channel could no be opened.
 	 */
 	protected AsynchronousFileIOChannel(FileIOChannel.ID channelID, RequestQueue<R> requestQueue, 
-			RequestDoneCallback callback, boolean writeEnabled) throws IOException
+			RequestDoneCallback<T> callback, boolean writeEnabled) throws IOException
 	{
 		super(channelID, writeEnabled);
-		
-		if (requestQueue == null) {
-			throw new NullPointerException();
-		}
-		
-		this.requestQueue = requestQueue;
-		this.resultHander = callback;
+
+		this.requestQueue = checkNotNull(requestQueue);
+		this.resultHandler = checkNotNull(callback);
 	}
 	
 	// --------------------------------------------------------------------------------------------
@@ -106,7 +104,7 @@ public abstract class AsynchronousFileIOChannel<R extends IORequest> extends Abs
 			
 			try {
 				// wait until as many buffers have been returned as were written
-				// only then is everything guaranteed to be consistent.{
+				// only then is everything guaranteed to be consistent.
 				while (this.requestsNotReturned.get() > 0) {
 					try {
 						// we add a timeout here, because it is not guaranteed that the
@@ -115,7 +113,7 @@ public abstract class AsynchronousFileIOChannel<R extends IORequest> extends Abs
 						this.closeLock.wait(1000);
 						checkErroneous();
 					}
-					catch (InterruptedException iex) {}
+					catch (InterruptedException ignored) {}
 				}
 			}
 			finally {
@@ -167,15 +165,19 @@ public abstract class AsynchronousFileIOChannel<R extends IORequest> extends Abs
 	 * @param buffer The buffer to be processed.
 	 * @param ex The exception that occurred in the I/O threads when processing the buffer's request.
 	 */
-	final void handleProcessedBuffer(MemorySegment buffer, IOException ex) {
+	final protected void handleProcessedBuffer(T buffer, IOException ex) {
+		if (buffer == null) {
+			return;
+		}
+
 		// even if the callbacks throw an error, we need to maintain our bookkeeping
 		try {
 			if (ex != null && this.exception == null) {
 				this.exception = ex;
-				this.resultHander.requestFailed(buffer, ex);
+				this.resultHandler.requestFailed(buffer, ex);
 			}
 			else {
-				this.resultHander.requestSuccessful(buffer);
+				this.resultHandler.requestSuccessful(buffer);
 			}
 		}
 		finally {
@@ -193,6 +195,21 @@ public abstract class AsynchronousFileIOChannel<R extends IORequest> extends Abs
 			}
 		}
 	}
+
+	final protected void addRequest(R request) throws IOException {
+		// check the error state of this channel
+		checkErroneous();
+
+		// write the current buffer and get the next one
+		this.requestsNotReturned.incrementAndGet();
+		if (this.closed || this.requestQueue.isClosed()) {
+			// if we found ourselves closed after the counter increment,
+			// decrement the counter again and do not forward the request
+			this.requestsNotReturned.decrementAndGet();
+			throw new IOException("I/O channel already closed. Could not fulfill: " + request);
+		}
+		this.requestQueue.add(request);
+	}
 }
 
 //--------------------------------------------------------------------------------------------
@@ -202,11 +219,11 @@ public abstract class AsynchronousFileIOChannel<R extends IORequest> extends Abs
  */
 final class SegmentReadRequest implements ReadRequest {
 	
-	private final AsynchronousFileIOChannel<ReadRequest> channel;
+	private final AsynchronousFileIOChannel<MemorySegment, ReadRequest> channel;
 	
 	private final MemorySegment segment;
 	
-	protected SegmentReadRequest(AsynchronousFileIOChannel<ReadRequest> targetChannel, MemorySegment segment) {
+	protected SegmentReadRequest(AsynchronousFileIOChannel<MemorySegment, ReadRequest> targetChannel, MemorySegment segment) {
 		this.channel = targetChannel;
 		this.segment = segment;
 	}
@@ -238,11 +255,11 @@ final class SegmentReadRequest implements ReadRequest {
  */
 final class SegmentWriteRequest implements WriteRequest {
 	
-	private final AsynchronousFileIOChannel<WriteRequest> channel;
+	private final AsynchronousFileIOChannel<MemorySegment, WriteRequest> channel;
 	
 	private final MemorySegment segment;
 	
-	protected SegmentWriteRequest(AsynchronousFileIOChannel<WriteRequest> targetChannel, MemorySegment segment) {
+	protected SegmentWriteRequest(AsynchronousFileIOChannel<MemorySegment, WriteRequest> targetChannel, MemorySegment segment) {
 		this.channel = targetChannel;
 		this.segment = segment;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
index f77d9c4..ff2fb85 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
@@ -18,20 +18,20 @@
 
 package org.apache.flink.runtime.io.disk.iomanager;
 
+import org.apache.flink.core.memory.MemorySegment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.core.memory.MemorySegment;
-
 /**
  * The facade for the provided I/O manager services.
  */
 public abstract class IOManager {
-	
+
 	/** Logging */
 	protected static final Logger LOG = LoggerFactory.getLogger(IOManager.class);
 
@@ -40,17 +40,17 @@ public abstract class IOManager {
 
 	/** A random number generator for the anonymous ChannelIDs. */
 	private final Random random;
-	
+
 	/** The number of the next path to use. */
 	private volatile int nextPath;
-	
+
 	// -------------------------------------------------------------------------
 	//               Constructors / Destructors
 	// -------------------------------------------------------------------------
 
 	/**
 	 * Constructs a new IOManager.
-	 * 
+	 *
 	 * @param paths
 	 *        the basic directory paths for files underlying anonymous channels.
 	 */
@@ -64,10 +64,10 @@ public abstract class IOManager {
 	 * Close method, marks the I/O manager as closed.
 	 */
 	public abstract void shutdown();
-	
+
 	/**
 	 * Utility method to check whether the IO manager has been properly shut down.
-	 * 
+	 *
 	 * @return True, if the IO manager has properly shut down, false otherwise.
 	 */
 	public abstract boolean isProperlyShutDown();
@@ -75,11 +75,11 @@ public abstract class IOManager {
 	// ------------------------------------------------------------------------
 	//                          Channel Instantiations
 	// ------------------------------------------------------------------------
-	
+
 	/**
 	 * Creates a new {@link FileIOChannel.ID} in one of the temp directories. Multiple
 	 * invocations of this method spread the channels evenly across the different directories.
-	 * 
+	 *
 	 * @return A channel to a temporary directory.
 	 */
 	public FileIOChannel.ID createChannel() {
@@ -90,22 +90,21 @@ public abstract class IOManager {
 	/**
 	 * Creates a new {@link FileIOChannel.Enumerator}, spreading the channels in a round-robin fashion
 	 * across the temporary file directories.
-	 * 
+	 *
 	 * @return An enumerator for channels.
 	 */
 	public FileIOChannel.Enumerator createChannelEnumerator() {
 		return new FileIOChannel.Enumerator(this.paths, this.random);
 	}
 
-	
 	// ------------------------------------------------------------------------
 	//                        Reader / Writer instantiations
 	// ------------------------------------------------------------------------
-	
+
 	/**
 	 * Creates a block channel writer that writes to the given channel. The writer adds the
 	 * written segment to its return-queue afterwards (to allow for asynchronous implementations).
-	 * 
+	 *
 	 * @param channelID The descriptor for the channel to write to.
 	 * @return A block channel writer that writes to the given channel.
 	 * @throws IOException Thrown, if the channel for the writer could not be opened.
@@ -113,11 +112,11 @@ public abstract class IOManager {
 	public BlockChannelWriter createBlockChannelWriter(FileIOChannel.ID channelID) throws IOException {
 		return createBlockChannelWriter(channelID, new LinkedBlockingQueue<MemorySegment>());
 	}
-	
+
 	/**
 	 * Creates a block channel writer that writes to the given channel. The writer adds the
 	 * written segment to the given queue (to allow for asynchronous implementations).
-	 * 
+	 *
 	 * @param channelID The descriptor for the channel to write to.
 	 * @param returnQueue The queue to put the written buffers into.
 	 * @return A block channel writer that writes to the given channel.
@@ -125,24 +124,24 @@ public abstract class IOManager {
 	 */
 	public abstract BlockChannelWriter createBlockChannelWriter(FileIOChannel.ID channelID,
 				LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException;
-	
+
 	/**
 	 * Creates a block channel writer that writes to the given channel. The writer calls the given callback
 	 * after the I/O operation has been performed (successfully or unsuccessfully), to allow
 	 * for asynchronous implementations.
-	 * 
+	 *
 	 * @param channelID The descriptor for the channel to write to.
-	 * @param callback The callback to be called for 
+	 * @param callback The callback to be called for
 	 * @return A block channel writer that writes to the given channel.
 	 * @throws IOException Thrown, if the channel for the writer could not be opened.
 	 */
-	public abstract BlockChannelWriterWithCallback createBlockChannelWriter(FileIOChannel.ID channelID, RequestDoneCallback callback) throws IOException;
-	
+	public abstract BlockChannelWriterWithCallback createBlockChannelWriter(FileIOChannel.ID channelID, RequestDoneCallback<MemorySegment> callback) throws IOException;
+
 	/**
 	 * Creates a block channel reader that reads blocks from the given channel. The reader pushed
 	 * full memory segments (with the read data) to its "return queue", to allow for asynchronous read
 	 * implementations.
-	 * 
+	 *
 	 * @param channelID The descriptor for the channel to write to.
 	 * @return A block channel reader that reads from the given channel.
 	 * @throws IOException Thrown, if the channel for the reader could not be opened.
@@ -150,41 +149,41 @@ public abstract class IOManager {
 	public BlockChannelReader createBlockChannelReader(FileIOChannel.ID channelID) throws IOException {
 		return createBlockChannelReader(channelID, new LinkedBlockingQueue<MemorySegment>());
 	}
-	
+
 	/**
 	 * Creates a block channel reader that reads blocks from the given channel. The reader pushes the full segments
 	 * to the given queue, to allow for asynchronous implementations.
-	 * 
+	 *
 	 * @param channelID The descriptor for the channel to write to.
 	 * @param returnQueue The queue to put the full buffers into.
 	 * @return A block channel reader that reads from the given channel.
 	 * @throws IOException Thrown, if the channel for the reader could not be opened.
 	 */
-	public abstract BlockChannelReader createBlockChannelReader(FileIOChannel.ID channelID, 
+	public abstract BlockChannelReader createBlockChannelReader(FileIOChannel.ID channelID,
 										LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException;
-	
+
 	/**
 	 * Creates a block channel reader that reads all blocks from the given channel directly in one bulk.
 	 * The reader draws segments to read the blocks into from a supplied list, which must contain as many
-	 * segments as the channel has blocks. After the reader is done, the list with the full segments can be 
+	 * segments as the channel has blocks. After the reader is done, the list with the full segments can be
 	 * obtained from the reader.
 	 * <p>
-	 * If a channel is not to be read in one bulk, but in multiple smaller batches, a  
+	 * If a channel is not to be read in one bulk, but in multiple smaller batches, a
 	 * {@link BlockChannelReader} should be used.
-	 * 
+	 *
 	 * @param channelID The descriptor for the channel to write to.
 	 * @param targetSegments The list to take the segments from into which to read the data.
 	 * @param numBlocks The number of blocks in the channel to read.
 	 * @return A block channel reader that reads from the given channel.
 	 * @throws IOException Thrown, if the channel for the reader could not be opened.
 	 */
-	public abstract BulkBlockChannelReader createBulkBlockChannelReader(FileIOChannel.ID channelID, 
+	public abstract BulkBlockChannelReader createBulkBlockChannelReader(FileIOChannel.ID channelID,
 			List<MemorySegment> targetSegments, int numBlocks) throws IOException;
-	
-	// ------------------------------------------------------------------------
-	//                          Utilities
-	// ------------------------------------------------------------------------
-	
+
+	// ========================================================================
+	//                             Utilities
+	// ========================================================================
+
 	protected int getNextPathNum() {
 		final int next = this.nextPath;
 		final int newNext = next + 1;


Mime
View raw message