flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject flink git commit: [FLINK-1489] Replaces blocking scheduleOrUpdateConsumers message calls with asynchronous futures. Buffers PartitionInfos at the JobManager in case that the respective consumer has not been scheduled.
Date Wed, 11 Feb 2015 16:34:44 GMT
Repository: flink
Updated Branches:
  refs/heads/master f577c9ef4 -> aedbacfc5


[FLINK-1489] Replaces blocking scheduleOrUpdateConsumers message calls with asynchronous futures.
Buffers PartitionInfos at the JobManager in case that the respective consumer has not been
scheduled.

Conflicts:
	flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala

Adds TaskUpdate message aggregation before sending the messages to the TaskManagers

This closes #378


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aedbacfc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aedbacfc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aedbacfc

Branch: refs/heads/master
Commit: aedbacfc553a7226bbf06410232fb2a2b315cf8b
Parents: f577c9e
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Fri Feb 6 15:13:28 2015 +0100
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Wed Feb 11 17:34:18 2015 +0100

----------------------------------------------------------------------
 .../deployment/PartialPartitionInfo.java        | 102 ++++++++++++
 .../flink/runtime/deployment/PartitionInfo.java |   3 +-
 .../flink/runtime/executiongraph/Execution.java | 157 ++++++++++++++-----
 .../runtime/executiongraph/ExecutionGraph.java  |  14 +-
 .../runtime/executiongraph/ExecutionVertex.java |  27 +++-
 .../runtime/io/network/NetworkEnvironment.java  |  16 +-
 .../partition/IntermediateResultPartition.java  |  44 +++---
 .../apache/flink/runtime/client/JobClient.scala |   5 +-
 .../flink/runtime/jobmanager/JobManager.scala   |  16 +-
 .../runtime/messages/JobmanagerMessages.scala   |   1 -
 .../flink/runtime/messages/Messages.scala       |  25 +++
 .../runtime/messages/TaskManagerMessages.scala  |  28 +++-
 .../flink/runtime/taskmanager/TaskManager.scala |  53 ++++---
 13 files changed, 380 insertions(+), 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/aedbacfc/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialPartitionInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialPartitionInfo.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialPartitionInfo.java
new file mode 100644
index 0000000..a27c976
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialPartitionInfo.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.deployment;
+
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionEdge;
+import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.io.network.RemoteAddress;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+
+/**
+ * This class contains the partial partition info which is created if the consumer instance
is not
+ * yet clear. Once the instance on which the consumer runs is known, the complete partition
info
+ * can be computed.
+ */
+public class PartialPartitionInfo {
+	private final IntermediateDataSetID intermediateDataSetID;
+
+	private final IntermediateResultPartitionID partitionID;
+
+	private final ExecutionAttemptID producerExecutionID;
+
+	private final InstanceConnectionInfo producerInstanceConnectionInfo;
+
+	private final int partitionConnectionIndex;
+
+	public PartialPartitionInfo(IntermediateDataSetID intermediateDataSetID,
+								IntermediateResultPartitionID partitionID,
+								ExecutionAttemptID executionID,
+								InstanceConnectionInfo producerInstanceConnectionInfo,
+								int partitionConnectionIndex) {
+		this.intermediateDataSetID = intermediateDataSetID;
+		this.partitionID = partitionID;
+		this.producerExecutionID = executionID;
+		this.producerInstanceConnectionInfo = producerInstanceConnectionInfo;
+		this.partitionConnectionIndex = partitionConnectionIndex;
+	}
+
+	public PartitionInfo createPartitionInfo(Execution consumerExecution) throws IllegalStateException
{
+		if(consumerExecution != null){
+			PartitionInfo.PartitionLocation producerLocation;
+
+			RemoteAddress resolvedProducerAddress;
+
+			if(consumerExecution.getAssignedResourceLocation().equals(
+					producerInstanceConnectionInfo)) {
+				resolvedProducerAddress = null;
+				producerLocation = PartitionInfo.PartitionLocation.LOCAL;
+			} else {
+				resolvedProducerAddress = new RemoteAddress(producerInstanceConnectionInfo,
+						partitionConnectionIndex);
+
+				producerLocation = PartitionInfo.PartitionLocation.REMOTE;
+			}
+
+			return new PartitionInfo(partitionID, producerExecutionID, producerLocation,
+					resolvedProducerAddress);
+
+		} else {
+			throw new RuntimeException("Cannot create partition info, because consumer execution "
+
+					"is null.");
+		}
+	}
+
+	public IntermediateDataSetID getIntermediateDataSetID() {
+		return intermediateDataSetID;
+	}
+
+	public static PartialPartitionInfo fromEdge(final ExecutionEdge edge){
+		IntermediateResultPartition partition = edge.getSource();
+		IntermediateResultPartitionID partitionID = edge.getSource().getPartitionId();
+
+		IntermediateDataSetID intermediateDataSetID = partition.getIntermediateResult().getId();
+
+		Execution producer = partition.getProducer().getCurrentExecutionAttempt();
+		ExecutionAttemptID producerExecutionID = producer.getAttemptId();
+
+		return new PartialPartitionInfo(intermediateDataSetID, partitionID, producerExecutionID,
+				producer.getAssignedResourceLocation(),
+				partition.getIntermediateResult().getConnectionIndex());
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/aedbacfc/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionInfo.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionInfo.java
index dd2c063..333340a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionInfo.java
@@ -130,7 +130,8 @@ public class PartitionInfo implements IOReadableWritable, Serializable
{
 
 		// The producer needs to be running, otherwise the consumer might request a partition,
 		// which has not been registered yet.
-		if (producerSlot != null && producerState == ExecutionState.RUNNING) {
+		if (producerSlot != null && (producerState == ExecutionState.RUNNING ||
+			producerState == ExecutionState.FINISHED)) {
 			if (producerSlot.getInstance().equals(consumerSlot.getInstance())) {
 				producerLocation = PartitionLocation.LOCAL;
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/aedbacfc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 8c5c673..c4e5abf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -20,17 +20,23 @@ package org.apache.flink.runtime.executiongraph;
 
 import akka.actor.ActorRef;
 import akka.dispatch.OnComplete;
+import static akka.dispatch.Futures.future;
+
+import akka.dispatch.OnFailure;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.deployment.PartialPartitionInfo;
 import org.apache.flink.runtime.deployment.PartitionInfo;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.io.network.RemoteAddress;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
@@ -39,6 +45,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFutureAction;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
+import org.apache.flink.runtime.messages.TaskManagerMessages.TaskOperationResult;
 import org.apache.flink.util.ExceptionUtils;
 import org.slf4j.Logger;
 
@@ -46,7 +53,10 @@ import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import static com.google.common.base.Preconditions.checkArgument;
@@ -316,21 +326,26 @@ public class Execution implements Serializable {
 						markFailed(failure);
 					}
 					else {
-						TaskManagerMessages.TaskOperationResult result = (TaskManagerMessages.TaskOperationResult)
success;
 						if (success == null) {
 							markFailed(new Exception("Failed to deploy the task to slot " + slot + ": TaskOperationResult
was null"));
 						}
-						else if (!result.executionID().equals(attemptId)) {
-							markFailed(new Exception("Answer execution id does not match the request execution
id."));
-						}
-						else if (result.success()) {
-							switchToRunning();
-						}
-						else {
-							// deployment failed :(
-							markFailed(new Exception("Failed to deploy the task " +
-									getVertexWithAttempt() + " to slot " + slot + ": " + result
-									.description()));
+
+						if(success instanceof TaskOperationResult) {
+							TaskOperationResult result = (TaskOperationResult) success;
+
+							if (!result.executionID().equals(attemptId)) {
+								markFailed(new Exception("Answer execution id does not match the request execution
id."));
+							} else if (result.success()) {
+								switchToRunning();
+							} else {
+								// deployment failed :(
+								markFailed(new Exception("Failed to deploy the task " +
+										getVertexWithAttempt() + " to slot " + slot + ": " + result
+										.description()));
+							}
+						}else {
+							markFailed(new Exception("Failed to deploy the task to slot " + slot +
+									": Response was not of type TaskOperationResult"));
 						}
 					}
 				}
@@ -401,11 +416,9 @@ public class Execution implements Serializable {
 	}
 
 	// TODO This leads to many unnecessary RPC calls in most cases
-	boolean scheduleOrUpdateConsumers(List<List<ExecutionEdge>> consumers) throws
Exception {
-		boolean success = true;
-
+	void scheduleOrUpdateConsumers(List<List<ExecutionEdge>> consumers) {
 		if (consumers.size() != 1) {
-			throw new IllegalStateException("Only one consumer is supported currently.");
+			fail(new IllegalStateException("Only one consumer is supported currently."));
 		}
 
 		final List<ExecutionEdge> consumer = consumers.get(0);
@@ -416,32 +429,66 @@ public class Execution implements Serializable {
 			final ExecutionState consumerState = consumerVertex.getExecutionState();
 
 			if (consumerState == CREATED) {
-				if (state == RUNNING) {
-					if (!consumerVertex.scheduleForExecution(consumerVertex.getExecutionGraph().getScheduler(),
false)) {
-						success = false;
+				consumerVertex.cachePartitionInfo(PartialPartitionInfo.fromEdge(edge));
+
+				future(new Callable<Boolean>(){
+					@Override
+					public Boolean call() throws Exception {
+						try {
+							consumerVertex.scheduleForExecution(
+								consumerVertex.getExecutionGraph().getScheduler(), false);
+						} catch (Exception exception) {
+							fail(new IllegalStateException("Could not schedule consumer " +
+									"vertex " + consumerVertex, exception));
+						}
+
+						return true;
 					}
-				}
-				else {
-					success = false;
+				}, AkkaUtils.globalExecutionContext());
+
+				// double check to resolve race conditions
+				if(consumerVertex.getExecutionState() == RUNNING){
+					consumerVertex.sendPartitionInfos();
 				}
 			}
 			else if (consumerState == RUNNING) {
 				SimpleSlot consumerSlot = consumerVertex.getCurrentAssignedResource();
-				ExecutionAttemptID consumerExecutionId = consumerVertex.getCurrentExecutionAttempt().getAttemptId();
+				ExecutionAttemptID consumerExecutionId = consumerVertex.
+						getCurrentExecutionAttempt().getAttemptId();
+
+				IntermediateResultPartitionID partitionID = edge.getSource().getPartitionId();
+				int connectionIndex = edge.getSource().getIntermediateResult().getConnectionIndex();
+
+				PartitionInfo.PartitionLocation producerLocation;
+				RemoteAddress producerAddress = null;
+
+				if(consumerSlot.getInstance().getInstanceConnectionInfo().equals(
+						getAssignedResourceLocation())) {
+					producerLocation = PartitionInfo.PartitionLocation.LOCAL;
+				} else {
+					producerLocation = PartitionInfo.PartitionLocation.REMOTE;
+					producerAddress = new RemoteAddress(getAssignedResourceLocation(),
+							connectionIndex);
+				}
 
-				PartitionInfo partitionInfo = PartitionInfo.fromEdge(edge, consumerSlot);
+				PartitionInfo partitionInfo = new PartitionInfo(partitionID, attemptId,
+						producerLocation, producerAddress);
 
-				if (!sendUpdateTaskRpcCall(consumerSlot, consumerExecutionId, edge.getSource().getIntermediateResult().getId(),
partitionInfo)) {
-					success = false;
-				}
+				TaskManagerMessages.UpdateTask updateTaskMessage =
+						new TaskManagerMessages.UpdateTaskSinglePartitionInfo(consumerExecutionId,
+								edge.getSource().getIntermediateResult().getId(), partitionInfo);
 
+				sendUpdateTaskRpcCall(consumerSlot, updateTaskMessage);
 			}
 			else if (consumerState == SCHEDULED || consumerState == DEPLOYING) {
-				success = false;
+				consumerVertex.cachePartitionInfo(PartialPartitionInfo.fromEdge(edge));
+
+				// double check to resolve race conditions
+				if(consumerVertex.getExecutionState() == RUNNING){
+					consumerVertex.sendPartitionInfos();
+				}
 			}
 		}
-
-		return success;
 	}
 
 	/**
@@ -548,6 +595,32 @@ public class Execution implements Serializable {
 		}
 	}
 
+	void sendPartitionInfos() {
+		ConcurrentLinkedQueue<PartialPartitionInfo> partialPartitionInfos =
+				vertex.getPartialPartitionInfos();
+
+		// check if the ExecutionVertex has already been archived and thus cleared the
+		// partial partition infos queue
+		if(partialPartitionInfos != null) {
+
+			PartialPartitionInfo partialPartitionInfo;
+
+			List<IntermediateDataSetID> resultIDs = new ArrayList<IntermediateDataSetID>();
+			List<PartitionInfo> partitionInfos = new ArrayList<PartitionInfo>();
+
+			while ((partialPartitionInfo = partialPartitionInfos.poll()) != null) {
+				resultIDs.add(partialPartitionInfo.getIntermediateDataSetID());
+				partitionInfos.add(partialPartitionInfo.createPartitionInfo(this));
+			}
+
+			TaskManagerMessages.UpdateTask updateTaskMessage =
+					TaskManagerMessages.createUpdateTaskMultiplePartitionInfos(attemptId, resultIDs,
+							partitionInfos);
+
+			sendUpdateTaskRpcCall(assignedResource, updateTaskMessage);
+		}
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Internal Actions
 	// --------------------------------------------------------------------------------------------
@@ -614,6 +687,7 @@ public class Execution implements Serializable {
 	private boolean switchToRunning() {
 
 		if (transitionState(DEPLOYING, RUNNING)) {
+			sendPartitionInfos();
 			return true;
 		}
 		else {
@@ -671,7 +745,7 @@ public class Execution implements Serializable {
 				if(failure != null){
 					fail(new Exception("Task could not be canceled.", failure));
 				}else{
-					TaskManagerMessages.TaskOperationResult result = (TaskManagerMessages.TaskOperationResult)success;
+					TaskOperationResult result = (TaskOperationResult)success;
 					if(!result.success()){
 						LOG.debug("Cancel task call did not find task. Probably akka message call" +
 								" race.");
@@ -700,21 +774,20 @@ public class Execution implements Serializable {
 		}
 	}
 
-	private boolean sendUpdateTaskRpcCall(final SimpleSlot consumerSlot, final ExecutionAttemptID
executionId, final IntermediateDataSetID resultId, final PartitionInfo partitionInfo) throws
Exception {
+	private void sendUpdateTaskRpcCall(final SimpleSlot consumerSlot,
+									final TaskManagerMessages.UpdateTask updateTaskMsg) {
 		final Instance instance = consumerSlot.getInstance();
 
-		final TaskManagerMessages.TaskOperationResult result = AkkaUtils.ask(
-				instance.getTaskManager(), new TaskManagerMessages.UpdateTask(executionId, resultId,
partitionInfo), timeout);
+		Future<Object> futureUpdate = Patterns.ask(instance.getTaskManager(), updateTaskMsg,
+				new Timeout(timeout));
 
-		if (!result.success()) {
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Update task {} was unsuccessful (maybe an RPC race): {}", executionId, result.description());
+		futureUpdate.onFailure(new OnFailure() {
+			@Override
+			public void onFailure(Throwable failure) throws Throwable {
+				fail(new IllegalStateException("Update task on instance " + instance +
+						" failed due to:", failure));
 			}
-
-			return false;
-		}
-
-		return true;
+		}, AkkaUtils.globalExecutionContext());
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/aedbacfc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 3f857e5..c1f45e1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -521,14 +521,20 @@ public class ExecutionGraph implements Serializable {
 		}
 	}
 
-	public boolean scheduleOrUpdateConsumers(ExecutionAttemptID executionId, int partitionIndex)
throws Exception {
+	public void scheduleOrUpdateConsumers(ExecutionAttemptID executionId, int partitionIndex)
{
 		Execution execution = currentExecutions.get(executionId);
 
+
 		if (execution == null) {
-			throw new IllegalStateException("Cannot find execution for execution ID " + executionId);
+			fail(new IllegalStateException("Cannot find execution for execution ID " +
+					executionId));
+		}
+		else if(execution.getVertex() == null){
+			fail(new IllegalStateException("Execution with execution ID " + executionId +
+				" has no vertex assigned."));
+		} else {
+			execution.getVertex().scheduleOrUpdateConsumers(partitionIndex);
 		}
-
-		return execution.getVertex().scheduleOrUpdateConsumers(partitionIndex);
 	}
 
 	public Map<ExecutionAttemptID, Execution> getRegisteredExecutions() {

http://git-wip-us.apache.org/repos/asf/flink/blob/aedbacfc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 86173da..58515ff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.executiongraph;
 
+import org.apache.flink.runtime.deployment.PartialPartitionInfo;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.JobException;
@@ -46,6 +47,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CopyOnWriteArrayList;
 
 import static com.google.common.base.Preconditions.checkElementIndex;
@@ -72,6 +74,8 @@ public class ExecutionVertex implements Serializable {
 	private IntermediateResultPartition[] resultPartitions;
 	
 	private ExecutionEdge[][] inputEdges;
+
+	private ConcurrentLinkedQueue<PartialPartitionInfo> partialPartitionInfos;
 	
 	private final int subTaskIndex;
 	
@@ -109,6 +113,9 @@ public class ExecutionVertex implements Serializable {
 		}
 
 		this.inputEdges = new ExecutionEdge[jobVertex.getJobVertex().getInputs().size()][];
+
+		this.partialPartitionInfos = new ConcurrentLinkedQueue<PartialPartitionInfo>();
+
 		this.priorExecutions = new CopyOnWriteArrayList<Execution>();
 
 		this.currentExecution = new Execution(this, 0, createTimestamp, timeout);
@@ -196,6 +203,10 @@ public class ExecutionVertex implements Serializable {
 	public ExecutionGraph getExecutionGraph() {
 		return this.jobVertex.getGraph();
 	}
+
+	public ConcurrentLinkedQueue<PartialPartitionInfo> getPartialPartitionInfos() {
+		return partialPartitionInfos;
+	}
 	
 	// --------------------------------------------------------------------------------------------
 	//  Graph building
@@ -408,18 +419,18 @@ public class ExecutionVertex implements Serializable {
 	 * Schedules or updates the {@link IntermediateResultPartition} consumer
 	 * tasks of the intermediate result partition with the given index.
 	 */
-	boolean scheduleOrUpdateConsumers(int partitionIndex) throws Exception {
+	void scheduleOrUpdateConsumers(int partitionIndex) {
 		checkElementIndex(partitionIndex, resultPartitions.length);
 
 		IntermediateResultPartition partition = resultPartitions[partitionIndex];
 
-		return currentExecution.scheduleOrUpdateConsumers(partition.getConsumers());
+		currentExecution.scheduleOrUpdateConsumers(partition.getConsumers());
 	}
 	
 	/**
 	 * This method cleans fields that are irrelevant for the archived execution attempt.
 	 */
-	public void prepareForArchiving() {
+	public void prepareForArchiving() throws IllegalStateException {
 		Execution execution = currentExecution;
 		ExecutionState state = execution.getState();
 
@@ -440,6 +451,16 @@ public class ExecutionVertex implements Serializable {
 		this.resultPartitions = null;
 		this.inputEdges = null;
 		this.locationConstraintInstances = null;
+		this.partialPartitionInfos.clear();
+		this.partialPartitionInfos = null;
+	}
+
+	public void cachePartitionInfo(PartialPartitionInfo partitionInfo){
+		this.partialPartitionInfos.add(partitionInfo);
+	}
+
+	void sendPartitionInfos() {
+		currentExecution.sendPartitionInfos();
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/aedbacfc/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index aa6c64c..74a448a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network;
 
 import akka.actor.ActorRef;
+import akka.util.Timeout;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.network.api.reader.BufferReader;
 import org.apache.flink.runtime.io.network.api.writer.BufferWriter;
@@ -47,6 +48,8 @@ public class NetworkEnvironment {
 
 	private static final Logger LOG = LoggerFactory.getLogger(NetworkEnvironment.class);
 
+	private final ActorRef taskManager;
+
 	private final ActorRef jobManager;
 
 	private final FiniteDuration jobManagerTimeout;
@@ -64,7 +67,10 @@ public class NetworkEnvironment {
 	/**
 	 * Initializes all network I/O components.
 	 */
-	public NetworkEnvironment(ActorRef jobManager, FiniteDuration jobManagerTimeout, NetworkEnvironmentConfiguration
config) throws IOException {
+	public NetworkEnvironment(ActorRef taskManager, ActorRef jobManager,
+							FiniteDuration jobManagerTimeout,
+							NetworkEnvironmentConfiguration config) throws IOException {
+		this.taskManager = checkNotNull(taskManager);
 		this.jobManager = checkNotNull(jobManager);
 		this.jobManagerTimeout = checkNotNull(jobManagerTimeout);
 
@@ -96,12 +102,16 @@ public class NetworkEnvironment {
 		}
 	}
 
+	public ActorRef getTaskManager() {
+		return taskManager;
+	}
+
 	public ActorRef getJobManager() {
 		return jobManager;
 	}
 
-	public FiniteDuration getJobManagerTimeout() {
-		return jobManagerTimeout;
+	public Timeout getJobManagerTimeout() {
+		return new Timeout(jobManagerTimeout);
 	}
 
 	public void registerTask(Task task) throws IOException {

http://git-wip-us.apache.org/repos/asf/flink/blob/aedbacfc/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartition.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartition.java
index 6acfbce..7a987c1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartition.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
+import akka.actor.ActorRef;
+import akka.dispatch.OnFailure;
+import akka.pattern.Patterns;
 import com.google.common.base.Optional;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.deployment.PartitionDeploymentDescriptor;
@@ -36,10 +39,11 @@ import org.apache.flink.runtime.io.network.partition.queue.PipelinedPartitionQue
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionType;
 import org.apache.flink.runtime.jobgraph.JobID;
-import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers;
+import org.apache.flink.runtime.messages.TaskManagerMessages.FailTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Option;
+import scala.concurrent.Future;
 
 import java.io.IOException;
 
@@ -243,28 +247,26 @@ public class IntermediateResultPartition implements BufferPoolOwner
{
 	}
 
 	private void scheduleOrUpdateConsumers() throws IOException {
-		while (!isReleased) {
-			final JobManagerMessages.ConsumerNotificationResult result = AkkaUtils.ask(
-					networkEnvironment.getJobManager(),
-					new JobManagerMessages.ScheduleOrUpdateConsumers(jobId, producerExecutionId, partitionIndex),
+		if(!isReleased){
+			ScheduleOrUpdateConsumers msg = new ScheduleOrUpdateConsumers(jobId,
+					producerExecutionId, partitionIndex);
+
+			Future<Object> futureResponse = Patterns.ask(networkEnvironment.getJobManager(),
msg,
 					networkEnvironment.getJobManagerTimeout());
 
-			if (result.success()) {
-				return;
-			}
-			else {
-				Option<Throwable> error = result.error();
-				if (error.isDefined()) {
-					throw new IOException(error.get().getMessage(), error.get());
-				}
-			}
+			futureResponse.onFailure(new OnFailure(){
+				@Override
+				public void onFailure(Throwable failure) throws Throwable {
+					LOG.error("Could not schedule or update consumers at the JobManager.", failure);
 
-			try {
-				Thread.sleep(10);
-			}
-			catch (InterruptedException e) {
-				throw new IOException("Unexpected interruption during consumer schedule or update.",
e);
-			}
+					// Fail task at the TaskManager
+					FailTask failMsg = new FailTask(producerExecutionId,
+							new RuntimeException("Could not schedule or update consumers at " +
+									"the JobManager.", failure));
+
+					networkEnvironment.getTaskManager().tell(failMsg, ActorRef.noSender());
+				}
+			}, AkkaUtils.globalExecutionContext());
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/aedbacfc/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
index 676ddda..191e11a 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
@@ -171,11 +171,10 @@ object JobClient{
     var waitForAnswer = true
     var answer: JobExecutionResult = null
 
-    val result =(jobClient ? SubmitJobAndWait(jobGraph, listenToEvents = listenToStatusEvents))(
-      AkkaUtils.INF_TIMEOUT).mapTo[JobExecutionResult]
-
     while(waitForAnswer) {
       try {
+        val result =(jobClient ? SubmitJobAndWait(jobGraph, listenToEvents = listenToStatusEvents))(
+          timeout).mapTo[JobExecutionResult]
         answer = Await.result(result, timeout)
         waitForAnswer = false
       } catch {

http://git-wip-us.apache.org/repos/asf/flink/blob/aedbacfc/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 756cb4b..eea7cae 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -20,14 +20,16 @@ package org.apache.flink.runtime.jobmanager
 
 import java.io.{IOException, File}
 import java.net.InetSocketAddress
+import akka.actor.Status.Failure
 import akka.actor._
 import akka.pattern.ask
 import org.apache.flink.configuration.{ConfigConstants, GlobalConfiguration, Configuration}
 import org.apache.flink.core.io.InputSplitAssigner
 import org.apache.flink.runtime.blob.BlobServer
-import org.apache.flink.runtime.executiongraph.{Execution, ExecutionJobVertex, ExecutionGraph}
+import org.apache.flink.runtime.executiongraph.{ExecutionJobVertex, ExecutionGraph}
 import org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph
 import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
+import org.apache.flink.runtime.messages.Messages.Acknowledge
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.util.EnvironmentInformation
 import org.apache.flink.runtime.{JobException, ActorLogMessages}
@@ -295,13 +297,13 @@ Actor with ActorLogMessages with ActorLogging {
     case ScheduleOrUpdateConsumers(jobId, executionId, partitionIndex) =>
       currentJobs.get(jobId) match {
         case Some((executionGraph, _)) =>
-          sender ! ConsumerNotificationResult(
-            executionGraph.scheduleOrUpdateConsumers(executionId, partitionIndex)
-          )
+          sender ! Acknowledge
+          executionGraph.scheduleOrUpdateConsumers(executionId, partitionIndex)
         case None =>
-          log.error("Cannot find execution graph for job ID {}.", jobId)
-          sender ! ConsumerNotificationResult(success = false, Some(
-            new IllegalStateException("Cannot find execution graph for job ID " + jobId)))
+          log.error("Cannot find execution graph for job ID {} to schedule or update consumers",
+            jobId);
+          sender ! Failure(new IllegalStateException("Cannot find execution graph for job
ID " +
+            jobId + " to schedule or update consumers."))
       }
 
     case ReportAccumulatorResult(accumulatorEvent) =>

http://git-wip-us.apache.org/repos/asf/flink/blob/aedbacfc/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala
index 5189a02..6aacf10 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala
@@ -28,7 +28,6 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState
  * The job manager specific messages
  */
 object JobManagerMessages {
-
   /**
    * Submits a job to the job manager. If [[registerForEvents]] is true,
    * then the sender will be registered as listener for the state change messages. If [[detached]]

http://git-wip-us.apache.org/repos/asf/flink/blob/aedbacfc/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/Messages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/Messages.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/Messages.scala
new file mode 100644
index 0000000..4f27761
--- /dev/null
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/Messages.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages
+
+object Messages {
+
+  case object Acknowledge
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/aedbacfc/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
index 6e3a3b1..f5a2cd4 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
@@ -58,12 +58,32 @@ object TaskManagerMessages {
   case class UnregisterTask(executionID: ExecutionAttemptID)
 
   /**
-   * Updates the reader identified by [[resultId]] of the task identified by
-   * [[executionId]] from the task manager.
+   * Updates the reader of the task identified by
+   * [[executionID]] from the task manager.
    */
-  case class UpdateTask(executionId: ExecutionAttemptID,
+  sealed trait UpdateTask{
+    def executionID: ExecutionAttemptID
+  }
+
+  case class UpdateTaskSinglePartitionInfo(executionID: ExecutionAttemptID,
                         resultId: IntermediateDataSetID,
-                        partitionInfo: PartitionInfo)
+                        partitionInfo: PartitionInfo) extends UpdateTask
+
+  case class UpdateTaskMultiplePartitionInfos(executionID: ExecutionAttemptID,
+                                              partitionInfos: Seq[(IntermediateDataSetID,
+                                                PartitionInfo)]) extends UpdateTask
+
+  def createUpdateTaskMultiplePartitionInfos(executionID: ExecutionAttemptID,
+                                             resultIDs: java.util.List[IntermediateDataSetID],
+                                             partitionInfos: java.util.List[PartitionInfo]):
+  UpdateTaskMultiplePartitionInfos = {
+    require(resultIDs.size() == partitionInfos.size(), "ResultIDs must have the same length
as" +
+      "partitionInfos.")
+
+    import scala.collection.JavaConverters.asScalaBufferConverter
+    new UpdateTaskMultiplePartitionInfos(executionID,
+      resultIDs.asScala.zip(partitionInfos.asScala))
+  }
 
   /**
    * Fails all intermediate result partitions identified by [[executionID]] from the task
manager.

http://git-wip-us.apache.org/repos/asf/flink/blob/aedbacfc/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index de3fa7a..eba688a 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -46,6 +46,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID
 import org.apache.flink.runtime.jobmanager.JobManager
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager
 import org.apache.flink.runtime.messages.JobManagerMessages.UpdateTaskExecutionState
+import org.apache.flink.runtime.messages.Messages.Acknowledge
 import org.apache.flink.runtime.messages.RegistrationMessages.{AlreadyRegistered,
 RefuseRegistration, AcknowledgeRegistration, RegisterTaskManager}
 import org.apache.flink.runtime.messages.TaskManagerMessages._
@@ -251,8 +252,13 @@ import scala.collection.JavaConverters._
     case SubmitTask(tdd) =>
       submitTask(tdd)
 
-    case UpdateTask(executionId, resultId, partitionInfo) =>
-      updateTask(executionId, resultId, partitionInfo)
+    case updateMsg:UpdateTask =>
+      updateMsg match {
+        case UpdateTaskSinglePartitionInfo(executionID, resultID, partitionInfo) =>
+          updateTask(executionID, List((resultID, partitionInfo)))
+        case UpdateTaskMultiplePartitionInfos(executionID, partitionInfos) =>
+          updateTask(executionID, partitionInfos)
+      }
 
     case CancelTask(executionID) =>
       runningTasks.get(executionID) match {
@@ -462,43 +468,45 @@ import scala.collection.JavaConverters._
     registered = false
   }
 
-  private def updateTask(executionId: ExecutionAttemptID, resultId: IntermediateDataSetID,
-                         partitionInfo: PartitionInfo): Unit = {
-
-    partitionInfo.getProducerLocation match {
-      case PartitionInfo.PartitionLocation.UNKNOWN =>
-        sender ! TaskOperationResult(executionId, success = false,
-          "Tried to update task with UNKNOWN channel.")
+  private def updateTask(executionId: ExecutionAttemptID,
+                         partitionInfos: Seq[(IntermediateDataSetID, PartitionInfo)]): Unit
= {
 
-      case _ =>
-        runningTasks.get(executionId) match {
-          case Some(task) =>
-            Option(task.getEnvironment.getReaderById(resultId)) match {
+    runningTasks.get(executionId) match {
+      case Some(task) =>
+        val errors = partitionInfos flatMap {
+          case (resultID, partitionInfo) =>
+            Option(task.getEnvironment.getReaderById(resultID)) match {
               case Some(reader) =>
                 Future {
                   try {
                     reader.updateInputChannel(partitionInfo)
                   } catch {
                     case t: Throwable =>
-                      log.error("Task update failure: {} Trying to cancel task.", t.getMessage)
+                      log.error(t, "Task update failure. Trying to cancel task.")
 
                       try {
                         task.cancelExecution()
                       } catch {
                         case t: Throwable =>
-                          log.error("Failed canceling task with execution ID {} after task"
+
-                            "update failure: {}.", executionId, t.getMessage)
+                          log.error(t, "Failed canceling task with execution ID {} after
task" +
+                            "update failure..", executionId)
                       }
                   }
                 }
-                sender ! TaskOperationResult(executionId, success = true)
-              case None => sender ! TaskOperationResult(executionId, success = false,
-                s"No reader with ID $resultId  was found.")
+                None
+              case None => Some(s"No reader with ID $resultID for task $executionId was
found.")
             }
+        }
 
-          case None => sender ! TaskOperationResult(executionId, success = false,
-            s"No task with execution ID $executionId was found.")
+        if(errors.isEmpty) {
+          sender ! Acknowledge
+        } else {
+          sender ! Failure(new IllegalStateException(errors.mkString("\n")))
         }
+      case None =>
+        log.info("Could not update task with ID {}, because it is no longer running.",
+          executionId)
+        sender ! Acknowledge
     }
   }
 
@@ -544,7 +552,8 @@ import scala.collection.JavaConverters._
     }
 
     try {
-      networkEnvironment = Some(new NetworkEnvironment(currentJobManager, timeout, networkConfig))
+      networkEnvironment = Some(new NetworkEnvironment(self, currentJobManager, timeout,
+        networkConfig))
     } catch {
       case ioe: IOException =>
         log.error(ioe, "Failed to instantiate network environment.")


Mime
View raw message