flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [7/7] flink git commit: [FLINK-2329] [runtime] Introduces InstanceGateway as an abstraction to communicate with the TaskManager.
Date Mon, 13 Jul 2015 16:54:10 GMT
[FLINK-2329] [runtime] Introduces InstanceGateway as an abstraction to communicate with the TaskManager.

Replaces AkkaUtils.globalExecutionContext with instance dependent ExecutionContext.

This closes #893


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2ccb5fdb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2ccb5fdb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2ccb5fdb

Branch: refs/heads/master
Commit: 2ccb5fdb47aa0e3766fd7fbd17a41feaca29fcbc
Parents: aa5e5b3
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Tue Jul 7 11:41:44 2015 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Mon Jul 13 17:54:31 2015 +0200

----------------------------------------------------------------------
 .../flink/runtime/executiongraph/Execution.java |  89 ++-
 .../runtime/executiongraph/ExecutionGraph.java  |  55 +-
 .../runtime/executiongraph/ExecutionVertex.java |  40 +-
 .../runtime/instance/AkkaInstanceGateway.java   | 111 ++++
 .../apache/flink/runtime/instance/Instance.java |  34 +-
 .../flink/runtime/instance/InstanceGateway.java |  82 +++
 .../flink/runtime/instance/InstanceManager.java |   3 +-
 .../runtime/io/network/NetworkEnvironment.java  |  34 +-
 .../runtime/jobmanager/scheduler/Scheduler.java |  11 +-
 .../jobmanager/web/SetupInfoServlet.java        |   3 +-
 .../web-docs-infoserver/js/taskmanager.js       |   2 +-
 .../apache/flink/runtime/akka/AkkaUtils.scala   |   2 -
 .../flink/runtime/jobmanager/JobManager.scala   |  95 +++-
 .../runtime/minicluster/FlinkMiniCluster.scala  |  20 +-
 .../flink/runtime/taskmanager/TaskManager.scala |  34 +-
 .../executiongraph/AllVerticesIteratorTest.java |   2 +
 .../ExecutionGraphConstructionTest.java         |  54 +-
 .../ExecutionGraphDeploymentTest.java           |  59 +-
 .../executiongraph/ExecutionGraphTestUtils.java | 121 ++--
 .../ExecutionStateProgressTest.java             |  31 +-
 .../ExecutionVertexCancelTest.java              | 563 ++++++++-----------
 .../ExecutionVertexDeploymentTest.java          | 115 +---
 .../ExecutionVertexSchedulingTest.java          |  35 +-
 .../executiongraph/LocalInputSplitsTest.java    |  53 +-
 .../executiongraph/PointwisePatternTest.java    |  50 +-
 .../TerminalStateDeadlockTest.java              |  10 +-
 .../VertexLocationConstraintTest.java           |  91 +--
 .../executiongraph/VertexSlotSharingTest.java   |   7 +-
 .../instance/BaseTestingInstanceGateway.java    |  94 ++++
 .../runtime/instance/DummyInstanceGateway.java  |  57 ++
 .../flink/runtime/instance/InstanceTest.java    |   7 +-
 .../flink/runtime/instance/SimpleSlotTest.java  |   4 +-
 .../io/network/NetworkEnvironmentTest.java      |   6 +-
 .../ScheduleWithCoLocationHintTest.java         |  37 +-
 .../scheduler/SchedulerIsolatedTasksTest.java   |  31 +-
 .../scheduler/SchedulerSlotSharingTest.java     |  51 +-
 .../scheduler/SchedulerTestUtils.java           |   4 +-
 ...askManagerComponentsStartupShutdownTest.java |   6 +-
 .../ExecutionGraphRestartTest.scala             |  40 +-
 .../TaskManagerLossFailsTasksTest.scala         |  33 +-
 .../runtime/jobmanager/RecoveryITCase.scala     |   8 +-
 .../runtime/testingUtils/TestingCluster.scala   |  30 +-
 .../testingUtils/TestingJobManager.scala        |  15 +-
 .../TestingJobManagerMessages.scala             |   3 +-
 .../runtime/testingUtils/TestingUtils.scala     |  62 +-
 .../apache/flink/test/util/TestBaseUtils.java   |   6 +-
 .../test/util/ForkableFlinkMiniCluster.scala    |  36 +-
 .../taskmanager/TaskManagerFailsITCase.scala    |  12 +-
 .../apache/flink/yarn/ApplicationMaster.scala   |  30 +-
 .../flink/yarn/ApplicationMasterActor.scala     |   2 +-
 50 files changed, 1399 insertions(+), 981 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 76a58e8..af67c3f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -18,13 +18,9 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import akka.actor.ActorRef;
 import akka.dispatch.OnComplete;
 import akka.dispatch.OnFailure;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
 import org.apache.flink.runtime.JobException;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionLocation;
@@ -32,6 +28,7 @@ import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.instance.InstanceGateway;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -50,6 +47,7 @@ import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.util.SerializedValue;
 import org.apache.flink.util.ExceptionUtils;
 import org.slf4j.Logger;
+import scala.concurrent.ExecutionContext;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -131,9 +129,20 @@ public class Execution implements Serializable {
 	
 	private SerializedValue<StateHandle<?>> operatorState;
 
+	/** The execution context which is used to execute futures. */
+	@SuppressWarnings("NonSerializableFieldInSerializableClass")
+	private ExecutionContext executionContext;
+
 	// --------------------------------------------------------------------------------------------
 	
-	public Execution(ExecutionVertex vertex, int attemptNumber, long startTimestamp, FiniteDuration timeout) {
+	public Execution(
+			ExecutionContext executionContext,
+			ExecutionVertex vertex,
+			int attemptNumber,
+			long startTimestamp,
+			FiniteDuration timeout) {
+		this.executionContext = checkNotNull(executionContext);
+
 		this.vertex = checkNotNull(vertex);
 		this.attemptId = new ExecutionAttemptID();
 		
@@ -200,6 +209,8 @@ public class Execution implements Serializable {
 		}
 		assignedResource = null;
 
+		executionContext = null;
+
 		partialInputChannelDeploymentDescriptors.clear();
 		partialInputChannelDeploymentDescriptors = null;
 	}
@@ -338,8 +349,9 @@ public class Execution implements Serializable {
 			vertex.getExecutionGraph().registerExecution(this);
 
 			final Instance instance = slot.getInstance();
-			final Future<Object> deployAction = Patterns.ask(instance.getTaskManager(),
-					new SubmitTask(deployment), new Timeout(timeout));
+			final InstanceGateway gateway = instance.getInstanceGateway();
+
+			final Future<Object> deployAction = gateway.ask(new SubmitTask(deployment), timeout);
 
 			deployAction.onComplete(new OnComplete<Object>(){
 
@@ -366,7 +378,7 @@ public class Execution implements Serializable {
 						}
 					}
 				}
-			}, AkkaUtils.globalExecutionContext());
+			}, executionContext);
 		}
 		catch (Throwable t) {
 			markFailed(t);
@@ -402,7 +414,7 @@ public class Execution implements Serializable {
 			else if (current == FINISHED || current == FAILED) {
 				// nothing to do any more. finished failed before it could be cancelled.
 				// in any case, the task is removed from the TaskManager already
-				sendFailIntermediateResultPartitionsRPCCall();
+				sendFailIntermediateResultPartitionsRpcCall();
 
 				return;
 			}
@@ -485,7 +497,7 @@ public class Execution implements Serializable {
 
 						return true;
 					}
-				}, AkkaUtils.globalExecutionContext());
+				}, executionContext);
 
 				// double check to resolve race conditions
 				if(consumerVertex.getExecutionState() == RUNNING){
@@ -533,7 +545,7 @@ public class Execution implements Serializable {
 					final UpdatePartitionInfo updateTaskMessage = new UpdateTaskSinglePartitionInfo(
 							consumer.getAttemptId(), partition.getIntermediateResult().getId(), descriptor);
 
-					sendUpdateTaskRpcCall(consumerSlot, updateTaskMessage);
+					sendUpdatePartitionInfoRpcCall(consumerSlot, updateTaskMessage);
 				}
 				// ----------------------------------------------------------------
 				// Consumer is scheduled or deploying => cache input channel
@@ -689,11 +701,12 @@ public class Execution implements Serializable {
 				inputChannelDeploymentDescriptors.add(partialInputChannelDeploymentDescriptor.createInputChannelDeploymentDescriptor(this));
 			}
 
-			UpdatePartitionInfo updateTaskMessage =
-					createUpdateTaskMultiplePartitionInfos(attemptId, resultIDs,
-							inputChannelDeploymentDescriptors);
+			UpdatePartitionInfo updateTaskMessage = createUpdateTaskMultiplePartitionInfos(
+				attemptId,
+				resultIDs,
+				inputChannelDeploymentDescriptors);
 
-			sendUpdateTaskRpcCall(assignedResource, updateTaskMessage);
+			sendUpdatePartitionInfoRpcCall(assignedResource, updateTaskMessage);
 		}
 	}
 
@@ -804,14 +817,23 @@ public class Execution implements Serializable {
 		}
 	}
 
+	/**
+	 * This method sends a CancelTask message to the instance of the assigned slot.
+	 *
+	 * The sending is tried up to NUM_CANCEL_CALL_TRIES times.
+	 */
 	private void sendCancelRpcCall() {
 		final SimpleSlot slot = this.assignedResource;
 
 		if (slot != null) {
 
-			Future<Object> cancelResult = AkkaUtils.retry(slot.getInstance().getTaskManager(), new
-							CancelTask(attemptId), NUM_CANCEL_CALL_TRIES,
-					AkkaUtils.globalExecutionContext(), timeout);
+			final InstanceGateway gateway = slot.getInstance().getInstanceGateway();
+
+			Future<Object> cancelResult = gateway.retry(
+				new CancelTask(attemptId),
+				NUM_CANCEL_CALL_TRIES,
+				timeout,
+				executionContext);
 
 			cancelResult.onComplete(new OnComplete<Object>() {
 
@@ -827,35 +849,40 @@ public class Execution implements Serializable {
 						}
 					}
 				}
-			}, AkkaUtils.globalExecutionContext());
+			}, executionContext);
 		}
 	}
 
-	private void sendFailIntermediateResultPartitionsRPCCall() {
+	private void sendFailIntermediateResultPartitionsRpcCall() {
 		final SimpleSlot slot = this.assignedResource;
 
 		if (slot != null) {
 			final Instance instance = slot.getInstance();
 
 			if (instance.isAlive()) {
-				try {
-					// TODO For some tests this could be a problem when querying too early if all resources were released
-					instance.getTaskManager().tell(new FailIntermediateResultPartitions(attemptId), ActorRef.noSender());
-				} catch (Throwable t) {
-					fail(new Exception("Intermediate result partition could not be failed.", t));
-				}
+				final InstanceGateway gateway = instance.getInstanceGateway();
+
+				// TODO For some tests this could be a problem when querying too early if all resources were released
+				gateway.tell(new FailIntermediateResultPartitions(attemptId));
 			}
 		}
 	}
 
-	private void sendUpdateTaskRpcCall(final SimpleSlot consumerSlot,
-										final UpdatePartitionInfo updateTaskMsg) {
+	/**
+	 * Sends an UpdatePartitionInfo message to the instance of the consumerSlot.
+	 *
+	 * @param consumerSlot Slot to whose instance the message will be sent
+	 * @param updatePartitionInfo UpdatePartitionInfo message
+	 */
+	private void sendUpdatePartitionInfoRpcCall(
+			final SimpleSlot consumerSlot,
+			final UpdatePartitionInfo updatePartitionInfo) {
 
 		if (consumerSlot != null) {
 			final Instance instance = consumerSlot.getInstance();
+			final InstanceGateway gateway = instance.getInstanceGateway();
 
-			Future<Object> futureUpdate = Patterns.ask(instance.getTaskManager(), updateTaskMsg,
-					new Timeout(timeout));
+			Future<Object> futureUpdate = gateway.ask(updatePartitionInfo, timeout);
 
 			futureUpdate.onFailure(new OnFailure() {
 				@Override
@@ -863,7 +890,7 @@ public class Execution implements Serializable {
 					fail(new IllegalStateException("Update task on instance " + instance +
 							" failed due to:", failure));
 				}
-			}, AkkaUtils.globalExecutionContext());
+			}, executionContext);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 84cbab7..47b7ae2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -24,7 +24,6 @@ import akka.actor.ActorSystem;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.JobException;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.execution.ExecutionState;
@@ -45,6 +44,7 @@ import org.apache.flink.util.InstantiationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import scala.concurrent.ExecutionContext;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.Serializable;
@@ -197,6 +197,10 @@ public class ExecutionGraph implements Serializable {
 	@SuppressWarnings("NonSerializableFieldInSerializableClass")
 	private CheckpointCoordinator checkpointCoordinator;
 
+	/** The execution context which is used to execute futures. */
+	@SuppressWarnings("NonSerializableFieldInSerializableClass")
+	private ExecutionContext executionContext;
+
 	// ------ Fields that are only relevant for archived execution graphs ------------
 	private ExecutionConfig executionConfig;
 
@@ -207,17 +211,38 @@ public class ExecutionGraph implements Serializable {
 	/**
 	 * This constructor is for tests only, because it does not include class loading information.
 	 */
-	ExecutionGraph(JobID jobId, String jobName, Configuration jobConfig, FiniteDuration timeout) {
-		this(jobId, jobName, jobConfig, timeout, new ArrayList<BlobKey>(), ExecutionGraph.class.getClassLoader());
-	}
-
-	public ExecutionGraph(JobID jobId, String jobName, Configuration jobConfig, FiniteDuration timeout,
-			List<BlobKey> requiredJarFiles, ClassLoader userClassLoader) {
-
-		if (jobId == null || jobName == null || jobConfig == null || userClassLoader == null) {
+	ExecutionGraph(
+			ExecutionContext executionContext,
+			JobID jobId,
+			String jobName,
+			Configuration jobConfig,
+			FiniteDuration timeout) {
+		this(
+			executionContext,
+			jobId,
+			jobName,
+			jobConfig,
+			timeout,
+			new ArrayList<BlobKey>(),
+			ExecutionGraph.class.getClassLoader()
+		);
+	}
+
+	public ExecutionGraph(
+			ExecutionContext executionContext,
+			JobID jobId,
+			String jobName,
+			Configuration jobConfig,
+			FiniteDuration timeout,
+			List<BlobKey> requiredJarFiles,
+			ClassLoader userClassLoader) {
+
+		if (executionContext == null || jobId == null || jobName == null || jobConfig == null || userClassLoader == null) {
 			throw new NullPointerException();
 		}
 
+		this.executionContext = executionContext;
+
 		this.jobID = jobId;
 		this.jobName = jobName;
 		this.jobConfiguration = jobConfig;
@@ -451,6 +476,15 @@ public class ExecutionGraph implements Serializable {
 		return this.stateTimestamps[status.ordinal()];
 	}
 
+	/**
+	 * Returns the ExecutionContext associated with this ExecutionGraph.
+	 *
+	 * @return ExecutionContext associated with this ExecutionGraph
+	 */
+	public ExecutionContext getExecutionContext() {
+		return executionContext;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Actions
 	// --------------------------------------------------------------------------------------------
@@ -629,6 +663,7 @@ public class ExecutionGraph implements Serializable {
 		userClassLoader = null;
 		scheduler = null;
 		checkpointCoordinator = null;
+		executionContext = null;
 
 		for (ExecutionJobVertex vertex : verticesInCreationOrder) {
 			vertex.prepareForArchiving();
@@ -719,7 +754,7 @@ public class ExecutionGraph implements Serializable {
 									restart();
 									return null;
 								}
-							}, AkkaUtils.globalExecutionContext());
+							}, executionContext);
 							break;
 						}
 						else if (numberOfRetriesLeft <= 0 && transitionState(current, JobStatus.FAILED, failureCause)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index a70fa7d..f9001cf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import akka.actor.ActorRef;
-
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
@@ -30,6 +28,7 @@ import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.instance.InstanceGateway;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
@@ -101,14 +100,20 @@ public class ExecutionVertex implements Serializable {
 
 	// --------------------------------------------------------------------------------------------
 
-	public ExecutionVertex(ExecutionJobVertex jobVertex, int subTaskIndex,
-						IntermediateResult[] producedDataSets, FiniteDuration timeout) {
+	public ExecutionVertex(
+			ExecutionJobVertex jobVertex,
+			int subTaskIndex,
+			IntermediateResult[] producedDataSets,
+			FiniteDuration timeout) {
 		this(jobVertex, subTaskIndex, producedDataSets, timeout, System.currentTimeMillis());
 	}
 
-	public ExecutionVertex(ExecutionJobVertex jobVertex, int subTaskIndex,
-						IntermediateResult[] producedDataSets, FiniteDuration timeout,
-						long createTimestamp) {
+	public ExecutionVertex(
+			ExecutionJobVertex jobVertex,
+			int subTaskIndex,
+			IntermediateResult[] producedDataSets,
+			FiniteDuration timeout,
+			long createTimestamp) {
 		this.jobVertex = jobVertex;
 		this.subTaskIndex = subTaskIndex;
 
@@ -125,7 +130,12 @@ public class ExecutionVertex implements Serializable {
 
 		this.priorExecutions = new CopyOnWriteArrayList<Execution>();
 
-		this.currentExecution = new Execution(this, 0, createTimestamp, timeout);
+		this.currentExecution = new Execution(
+			getExecutionGraph().getExecutionContext(),
+			this,
+			0,
+			createTimestamp,
+			timeout);
 
 		// create a co-location scheduling hint, if necessary
 		CoLocationGroup clg = jobVertex.getCoLocationGroup();
@@ -416,8 +426,12 @@ public class ExecutionVertex implements Serializable {
 
 			if (state == FINISHED || state == CANCELED || state == FAILED) {
 				priorExecutions.add(execution);
-				currentExecution = new Execution(this, execution.getAttemptNumber()+1,
-						System.currentTimeMillis(), timeout);
+				currentExecution = new Execution(
+					getExecutionGraph().getExecutionContext(),
+					this,
+					execution.getAttemptNumber()+1,
+					System.currentTimeMillis(),
+					timeout);
 
 				CoLocationGroup grp = jobVertex.getCoLocationGroup();
 				if (grp != null) {
@@ -455,9 +469,9 @@ public class ExecutionVertex implements Serializable {
 			
 			// send only if we actually have a target
 			if (slot != null) {
-				ActorRef taskManager = slot.getInstance().getTaskManager();
-				if (taskManager != null) {
-					taskManager.tell(message, ActorRef.noSender());
+				InstanceGateway gateway = slot.getInstance().getInstanceGateway();
+				if (gateway != null) {
+					gateway.tell(message);
 				}
 			}
 			else {

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaInstanceGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaInstanceGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaInstanceGateway.java
new file mode 100644
index 0000000..b7d60c5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaInstanceGateway.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import akka.actor.ActorRef;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * InstanceGateway implementation which uses Akka to communicate with remote instances.
+ */
+public class AkkaInstanceGateway implements InstanceGateway {
+
+	/** ActorRef of the remote instance */
+	private final ActorRef taskManager;
+
+	public AkkaInstanceGateway(ActorRef taskManager) {
+		this.taskManager = taskManager;
+	}
+
+	/**
+	 * Sends a message asynchronously and returns its response. The response to the message is
+	 * returned as a future.
+	 *
+	 * @param message Message to be sent
+	 * @param timeout Timeout until the Future is completed with an AskTimeoutException
+	 * @return Future which contains the response to the sent message
+	 */
+	@Override
+	public Future<Object> ask(Object message, FiniteDuration timeout) {
+		return Patterns.ask(taskManager, message, new Timeout(timeout));
+	}
+
+	/**
+	 * Sends a message asynchronously without a result.
+	 *
+	 * @param message Message to be sent
+	 */
+	@Override
+	public void tell(Object message) {
+		taskManager.tell(message, ActorRef.noSender());
+	}
+
+	/**
+	 * Forwards a message. For the receiver of this message it looks as if sender has sent the
+	 * message.
+	 *
+	 * @param message Message to be sent
+	 * @param sender Sender of the forwarded message
+	 */
+	@Override
+	public void forward(Object message, ActorRef sender) {
+		taskManager.tell(message, sender);
+	}
+
+	/**
+	 * Retries to send asynchronously a message up to numberRetries times. The response to this
+	 * message is returned as a future. The message is re-sent if the number of retries is not yet
+	 * exceeded and if an exception occurred while sending it.
+	 *
+	 * @param message Message to be sent
+	 * @param numberRetries Number of times to retry sending the message
+	 * @param timeout Timeout for each sending attempt
+	 * @param executionContext ExecutionContext which is used to send the message multiple times
+	 * @return Future of the response to the sent message
+	 */
+	@Override
+	public Future<Object> retry(
+			Object message,
+			int numberRetries,
+			FiniteDuration timeout,
+			ExecutionContext executionContext) {
+
+		return AkkaUtils.retry(
+			taskManager,
+			message,
+			numberRetries,
+			executionContext,
+			timeout);
+	}
+
+	/**
+	 * Returns the ActorPath of the remote instance.
+	 *
+	 * @return ActorPath of the remote instance.
+	 */
+	@Override
+	public String path() {
+		return taskManager.path().toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
index 39caf08..1c44b5d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
@@ -25,8 +25,6 @@ import java.util.List;
 import java.util.Queue;
 import java.util.Set;
 
-import akka.actor.ActorRef;
-
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotAvailabilityListener;
 import org.slf4j.Logger;
@@ -43,8 +41,8 @@ public class Instance {
 	/** The lock on which to synchronize allocations and failure state changes */
 	private final Object instanceLock = new Object();
 
-	/** The actor ref to the task manager represented by this taskManager. */
-	private final ActorRef taskManager;
+	/** The instacne gateway to communicate with the instance */
+	private final InstanceGateway instanceGateway;
 
 	/** The instance connection information for the data transfer. */
 	private final InstanceConnectionInfo connectionInfo;
@@ -81,15 +79,19 @@ public class Instance {
 	/**
 	 * Constructs an instance reflecting a registered TaskManager.
 	 *
-	 * @param taskManager The actor reference of the represented task manager.
+	 * @param instanceGateway The instance gateway to communicate with the remote instance
 	 * @param connectionInfo The remote connection where the task manager receives requests.
 	 * @param id The id under which the taskManager is registered.
 	 * @param resources The resources available on the machine.
 	 * @param numberOfSlots The number of task slots offered by this taskManager.
 	 */
-	public Instance(ActorRef taskManager, InstanceConnectionInfo connectionInfo, InstanceID id,
-					HardwareDescription resources, int numberOfSlots) {
-		this.taskManager = taskManager;
+	public Instance(
+			InstanceGateway instanceGateway,
+			InstanceConnectionInfo connectionInfo,
+			InstanceID id,
+			HardwareDescription resources,
+			int numberOfSlots) {
+		this.instanceGateway = instanceGateway;
 		this.connectionInfo = connectionInfo;
 		this.instanceId = id;
 		this.resources = resources;
@@ -327,12 +329,14 @@ public class Instance {
 		}
 	}
 
-	public ActorRef getTaskManager() {
-		return taskManager;
-	}
-
-	public String getPath(){
-		return taskManager.path().toString();
+	/**
+	 * Returns the InstanceGateway of this Instance. This gateway can be used to communicate with
+	 * it.
+	 *
+	 * @return InstanceGateway associated with this instance
+	 */
+	public InstanceGateway getInstanceGateway() {
+		return instanceGateway;
 	}
 
 	public InstanceConnectionInfo getInstanceConnectionInfo() {
@@ -386,6 +390,6 @@ public class Instance {
 	@Override
 	public String toString() {
 		return String.format("%s @ %s - %d slots - URL: %s", instanceId, connectionInfo.getHostname(),
-				numberOfSlots, (taskManager != null ? taskManager.path() : "ActorRef.noSender"));
+				numberOfSlots, (instanceGateway != null ? instanceGateway.path() : "No instance gateway"));
 	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceGateway.java
new file mode 100644
index 0000000..a30b2f6
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceGateway.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import akka.actor.ActorRef;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Interface to abstract the communication with an Instance.
+ *
+ * It allows to avoid direct interaction with an ActorRef.
+ */
+public interface InstanceGateway {
+
+	/**
+	 * Sends a message asynchronously and returns its response. The response to the message is
+	 * returned as a future.
+	 *
+	 * @param message Message to be sent
+	 * @param timeout Timeout until the Future is completed with an AskTimeoutException
+	 * @return Future which contains the response to the sent message
+	 */
+	Future<Object> ask(Object message, FiniteDuration timeout);
+
+	/**
+	 * Sends a message asynchronously without a result.
+	 *
+	 * @param message Message to be sent
+	 */
+	void tell(Object message);
+
+	/**
+	 * Forwards a message. For the receiver of this message it looks as if sender has sent the
+	 * message.
+	 *
+	 * @param message Message to be sent
+	 * @param sender Sender of the forwarded message
+	 */
+	void forward(Object message, ActorRef sender);
+
+	/**
+	 * Retries to send asynchronously a message up to numberRetries times. The response to this
+	 * message is returned as a future. The message is re-sent if the number of retries is not yet
+	 * exceeded and if an exception occurred while sending it.
+	 *
+	 * @param message Message to be sent
+	 * @param numberRetries Number of times to retry sending the message
+	 * @param timeout Timeout for each sending attempt
+	 * @param executionContext ExecutionContext which is used to send the message multiple times
+	 * @return Future of the response to the sent message
+	 */
+	Future<Object> retry(
+			Object message,
+			int numberRetries,
+			FiniteDuration timeout,
+			ExecutionContext executionContext);
+
+	/**
+	 * Returns the path of the remote instance.
+	 *
+	 * @return Path of the remote instance.
+	 */
+	String path();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
index c1800bd..4f6c7a6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
@@ -149,8 +149,9 @@ public class InstanceManager {
 				id = new InstanceID();
 			} while (registeredHostsById.containsKey(id));
 
+			InstanceGateway instanceGateway = new AkkaInstanceGateway(taskManager);
 
-			Instance host = new Instance(taskManager, connectionInfo, id, resources, numberOfSlots);
+			Instance host = new Instance(instanceGateway, connectionInfo, id, resources, numberOfSlots);
 
 			registeredHostsById.put(id, host);
 			registeredHostsByConnection.put(taskManager, host);

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index c082c6a..0ffc889 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -23,7 +23,6 @@ import akka.dispatch.OnFailure;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
@@ -46,6 +45,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Option;
 import scala.Tuple2;
+import scala.concurrent.ExecutionContext;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -89,11 +89,20 @@ public class NetworkEnvironment {
 	private boolean isShutdown;
 
 	/**
+	 * ExecutionEnvironment which is used to execute remote calls with the
+	 * {@link JobManagerResultPartitionConsumableNotifier}
+	 */
+	private final ExecutionContext executionContext;
+
+	/**
 	 * Initializes all network I/O components.
 	 */
-	public NetworkEnvironment(FiniteDuration jobManagerTimeout,
-								NetworkEnvironmentConfiguration config) throws IOException {
+	public NetworkEnvironment(
+		ExecutionContext executionContext,
+		FiniteDuration jobManagerTimeout,
+		NetworkEnvironmentConfiguration config) throws IOException {
 
+		this.executionContext = executionContext;
 		this.configuration = checkNotNull(config);
 		this.jobManagerTimeout = checkNotNull(jobManagerTimeout);
 
@@ -182,7 +191,10 @@ public class NetworkEnvironment {
 				this.partitionManager = new ResultPartitionManager();
 				this.taskEventDispatcher = new TaskEventDispatcher();
 				this.partitionConsumableNotifier = new JobManagerResultPartitionConsumableNotifier(
-													jobManagerRef, taskManagerRef, new Timeout(jobManagerTimeout));
+					executionContext,
+					jobManagerRef,
+					taskManagerRef,
+					new Timeout(jobManagerTimeout));
 
 				this.partitionStateChecker = new JobManagerPartitionStateChecker(
 						jobManagerRef, taskManagerRef);
@@ -414,6 +426,12 @@ public class NetworkEnvironment {
 	 */
 	private static class JobManagerResultPartitionConsumableNotifier implements ResultPartitionConsumableNotifier {
 
+		/**
+		 * {@link ExecutionContext} which is used for the failure handler of {@link ScheduleOrUpdateConsumers}
+		 * messages.
+		 */
+		private final ExecutionContext executionContext;
+
 		private final ActorRef jobManager;
 
 		private final ActorRef taskManager;
@@ -421,8 +439,12 @@ public class NetworkEnvironment {
 		private final Timeout jobManagerMessageTimeout;
 
 		public JobManagerResultPartitionConsumableNotifier(
-				ActorRef jobManager, ActorRef taskManager, Timeout jobManagerMessageTimeout) {
+			ExecutionContext executionContext,
+			ActorRef jobManager,
+			ActorRef taskManager,
+			Timeout jobManagerMessageTimeout) {
 
+			this.executionContext = executionContext;
 			this.jobManager = jobManager;
 			this.taskManager = taskManager;
 			this.jobManagerMessageTimeout = jobManagerMessageTimeout;
@@ -448,7 +470,7 @@ public class NetworkEnvironment {
 
 					taskManager.tell(failMsg, ActorRef.noSender());
 				}
-			}, AkkaUtils.globalExecutionContext());
+			}, executionContext);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index 940082e..cb99e52 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -37,7 +37,6 @@ import akka.dispatch.Futures;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.instance.SlotSharingGroupAssignment;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.instance.SharedSlot;
@@ -50,6 +49,7 @@ import org.apache.flink.util.ExceptionUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.concurrent.ExecutionContext;
 
 /**
  * The scheduler is responsible for distributing the ready-to-run tasks among instances and slots.
@@ -95,12 +95,17 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 	/** The number of slot allocations where locality could not be respected */
 	private int nonLocalizedAssignments;
 
+	/** The ExecutionContext which is used to execute newSlotAvailable futures. */
+	private final ExecutionContext executionContext;
+
 	// ------------------------------------------------------------------------
 
 	/**
 	 * Creates a new scheduler.
 	 */
-	public Scheduler() {}
+	public Scheduler(ExecutionContext executionContext) {
+		this.executionContext = executionContext;
+	}
 	
 	/**
 	 * Shuts the scheduler down. After shut down no more tasks can be added to the scheduler.
@@ -519,7 +524,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 				handleNewSlot();
 				return null;
 			}
-		}, AkkaUtils.globalExecutionContext());
+		}, executionContext);
 	}
 	
 	private void handleNewSlot() {

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
index 4e028d4..567d15a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
@@ -146,8 +146,7 @@ public class SetupInfoServlet extends HttpServlet {
 				long time = new Date().getTime() - instance.getLastHeartBeat();
 
 				try {
-					objInner.put("inetAdress", instance.getInstanceConnectionInfo().getInetAdress());
-					objInner.put("ipcPort", instance.getTaskManager().path().address().hostPort());
+					objInner.put("path", instance.getInstanceGateway().path());
 					objInner.put("dataPort", instance.getInstanceConnectionInfo().dataPort());
 					objInner.put("timeSinceLastHeartbeat", time / 1000);
 					objInner.put("slotsNumber", instance.getTotalNumberOfSlots());

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/resources/web-docs-infoserver/js/taskmanager.js
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/resources/web-docs-infoserver/js/taskmanager.js b/flink-runtime/src/main/resources/web-docs-infoserver/js/taskmanager.js
index 1ea9a41..68e4278 100644
--- a/flink-runtime/src/main/resources/web-docs-infoserver/js/taskmanager.js
+++ b/flink-runtime/src/main/resources/web-docs-infoserver/js/taskmanager.js
@@ -224,7 +224,7 @@ function processTMdata(json) {
                                "</div>";
 
             var content = "<tr id=\""+tmRowIdCssName+"\">" +
-		                "<td style=\"width:20%\">"+tm.inetAdress+" <br> IPC Port: "+tm.ipcPort+", Data Port: "+tm.dataPort+"</td>" + // first row: TaskManager
+		                "<td style=\"width:20%\">"+tm.path+" <br> Data Port: "+tm.dataPort+"</td>" + // first row: TaskManager
 		                "<td id=\""+tmRowIdCssName+"-memory\">"+tmMemoryBox+"</td>" + // second row: memory statistics
 		                "<td id=\""+tmRowIdCssName+"-info\"><i>Loading Information</i></td>" + // Information
 		                "</tr>";

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index 7ffaddd..d38e503 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -41,8 +41,6 @@ object AkkaUtils {
 
   val INF_TIMEOUT = 21474835 seconds
 
-  var globalExecutionContext: ExecutionContext = ExecutionContext.global
-
   /**
    * Creates a local actor system without remoting.
    *

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index dc1599a..3b4ce15 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -60,6 +60,7 @@ import org.apache.flink.util.{ExceptionUtils, InstantiationUtil}
 import scala.collection.JavaConverters._
 import scala.concurrent._
 import scala.concurrent.duration._
+import scala.concurrent.forkjoin.ForkJoinPool
 import scala.language.postfixOps
 
 /**
@@ -89,16 +90,18 @@ import scala.language.postfixOps
  * - [[JobStatusChanged]] indicates that the status of job (RUNNING, CANCELING, FINISHED, etc.) has
  * changed. This message is sent by the ExecutionGraph.
  */
-class JobManager(protected val flinkConfiguration: Configuration,
-                 protected val instanceManager: InstanceManager,
-                 protected val scheduler: FlinkScheduler,
-                 protected val libraryCacheManager: BlobLibraryCacheManager,
-                 protected val archive: ActorRef,
-                 protected val accumulatorManager: AccumulatorManager,
-                 protected val defaultExecutionRetries: Int,
-                 protected val delayBetweenRetries: Long,
-                 protected val timeout: FiniteDuration,
-                 protected val mode: StreamingMode)
+class JobManager(
+    protected val flinkConfiguration: Configuration,
+    protected val executionContext: ExecutionContext,
+    protected val instanceManager: InstanceManager,
+    protected val scheduler: FlinkScheduler,
+    protected val libraryCacheManager: BlobLibraryCacheManager,
+    protected val archive: ActorRef,
+    protected val accumulatorManager: AccumulatorManager,
+    protected val defaultExecutionRetries: Int,
+    protected val delayBetweenRetries: Long,
+    protected val timeout: FiniteDuration,
+    protected val mode: StreamingMode)
   extends Actor with ActorLogMessages with ActorSynchronousLogging {
 
   /** List of current jobs running jobs */
@@ -117,7 +120,7 @@ class JobManager(protected val flinkConfiguration: Configuration,
 
     // disconnect the registered task managers
     instanceManager.getAllRegisteredInstances.asScala.foreach {
-      _.getTaskManager ! Disconnect("JobManager is shutting down")
+      _.getInstanceGateway().tell(Disconnect("JobManager is shutting down"))
     }
 
     archive ! PoisonPill
@@ -136,7 +139,6 @@ class JobManager(protected val flinkConfiguration: Configuration,
     }
 
     log.debug(s"Job manager ${self.path} is completely stopped.")
-
   }
 
   /**
@@ -411,8 +413,8 @@ class JobManager(protected val flinkConfiguration: Configuration,
     case message: AccumulatorMessage => handleAccumulatorMessage(message)
 
     case RequestStackTrace(instanceID) =>
-      val taskManager = instanceManager.getRegisteredInstanceById(instanceID).getTaskManager
-      taskManager forward SendStackTrace
+      val gateway = instanceManager.getRegisteredInstanceById(instanceID).getInstanceGateway
+      gateway.forward(SendStackTrace, sender)
 
     case Terminated(taskManager) =>
       if (instanceManager.isRegistered(taskManager)) {
@@ -480,10 +482,18 @@ class JobManager(protected val flinkConfiguration: Configuration,
         }
 
         // see if there already exists an ExecutionGraph for the corresponding job ID
-        executionGraph = currentJobs.getOrElseUpdate(jobGraph.getJobID,
-          (new ExecutionGraph(jobGraph.getJobID, jobGraph.getName,
-            jobGraph.getJobConfiguration, timeout, jobGraph.getUserJarBlobKeys, userCodeLoader),
-            JobInfo(sender(), System.currentTimeMillis())))._1
+        executionGraph = currentJobs.getOrElseUpdate(
+          jobGraph.getJobID,
+          (new ExecutionGraph(
+            executionContext,
+            jobGraph.getJobID,
+            jobGraph.getName,
+            jobGraph.getJobConfiguration,
+            timeout,
+            jobGraph.getUserJarBlobKeys,
+            userCodeLoader),
+            JobInfo(sender(), System.currentTimeMillis()))
+        )._1
 
         // configure the execution graph
         val jobNumberRetries = if (jobGraph.getNumberOfExecutionRetries >= 0) {
@@ -1046,8 +1056,8 @@ object JobManager {
    * @param configuration The configuration from which to parse the config values.
    * @return The members for a default JobManager.
    */
-  def createJobManagerComponents(configuration: Configuration) :
-    (InstanceManager, FlinkScheduler, BlobLibraryCacheManager,
+  def createJobManagerComponents(configuration: Configuration)
+    : (ExecutionContext, InstanceManager, FlinkScheduler, BlobLibraryCacheManager,
       Props, AccumulatorManager, Int, Long, FiniteDuration, Int) = {
 
     val timeout: FiniteDuration = AkkaUtils.getTimeout(configuration)
@@ -1083,6 +1093,8 @@ object JobManager {
 
     val accumulatorManager: AccumulatorManager = new AccumulatorManager(Math.min(1, archiveCount))
 
+    val executionContext = ExecutionContext.fromExecutor(new ForkJoinPool())
+
     var blobServer: BlobServer = null
     var instanceManager: InstanceManager = null
     var scheduler: FlinkScheduler = null
@@ -1091,7 +1103,7 @@ object JobManager {
     try {
       blobServer = new BlobServer(configuration)
       instanceManager = new InstanceManager()
-      scheduler = new FlinkScheduler()
+      scheduler = new FlinkScheduler(executionContext)
       libraryCacheManager = new BlobLibraryCacheManager(blobServer, cleanupInterval)
 
       instanceManager.addInstanceListener(scheduler)
@@ -1114,8 +1126,16 @@ object JobManager {
       }
     }
 
-    (instanceManager, scheduler, libraryCacheManager, archiveProps, accumulatorManager,
-      executionRetries, delayBetweenRetries, timeout, archiveCount)
+    (executionContext,
+      instanceManager,
+      scheduler,
+      libraryCacheManager,
+      archiveProps,
+      accumulatorManager,
+      executionRetries,
+      delayBetweenRetries,
+      timeout,
+      archiveCount)
   }
 
   /**
@@ -1154,9 +1174,16 @@ object JobManager {
                             archiverActorName: Option[String],
                             streamingMode: StreamingMode): (ActorRef, ActorRef) = {
 
-    val (instanceManager, scheduler, libraryCacheManager, archiveProps, accumulatorManager,
-      executionRetries, delayBetweenRetries,
-      timeout, _) = createJobManagerComponents(configuration)
+    val (executionContext,
+      instanceManager,
+      scheduler,
+      libraryCacheManager,
+      archiveProps,
+      accumulatorManager,
+      executionRetries,
+      delayBetweenRetries,
+      timeout,
+      _) = createJobManagerComponents(configuration)
 
     // start the archiver with the given name, or without (avoid name conflicts)
     val archiver: ActorRef = archiverActorName match {
@@ -1164,9 +1191,19 @@ object JobManager {
       case None => actorSystem.actorOf(archiveProps)
     }
 
-    val jobManagerProps = Props(classOf[JobManager], configuration, instanceManager, scheduler,
-        libraryCacheManager, archiver, accumulatorManager, executionRetries,
-        delayBetweenRetries, timeout, streamingMode)
+    val jobManagerProps = Props(
+      classOf[JobManager],
+      configuration,
+      executionContext,
+      instanceManager,
+      scheduler,
+      libraryCacheManager,
+      archiver,
+      accumulatorManager,
+      executionRetries,
+      delayBetweenRetries,
+      timeout,
+      streamingMode)
 
     val jobManager: ActorRef = jobMangerActorName match {
       case Some(actorName) => actorSystem.actorOf(jobManagerProps, actorName)

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 73a37de..49c701e 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -34,7 +34,7 @@ import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegistere
 import org.slf4j.LoggerFactory
 
 import scala.concurrent.duration.FiniteDuration
-import scala.concurrent.{Future, Await}
+import scala.concurrent.{ExecutionContext, Future, Await}
 
 /**
  * Abstract base class for Flink's mini cluster. The mini cluster starts a
@@ -48,9 +48,10 @@ import scala.concurrent.{Future, Await}
  * @param streamingMode True, if the system should be started in streaming mode, false if
  *                      in pure batch mode.
  */
-abstract class FlinkMiniCluster(val userConfiguration: Configuration,
-                                val singleActorSystem: Boolean,
-                                val streamingMode: StreamingMode) {
+abstract class FlinkMiniCluster(
+    val userConfiguration: Configuration,
+    val singleActorSystem: Boolean,
+    val streamingMode: StreamingMode) {
 
   def this(userConfiguration: Configuration, singleActorSystem: Boolean) 
          = this(userConfiguration, singleActorSystem, StreamingMode.BATCH_ONLY)
@@ -157,7 +158,7 @@ abstract class FlinkMiniCluster(val userConfiguration: Configuration,
 
     val future = gracefulStop(jobManagerActor, timeout)
 
-    implicit val executionContext = AkkaUtils.globalExecutionContext
+    implicit val executionContext = ExecutionContext.global
 
     Await.ready(Future.sequence(future +: futures), timeout)
 
@@ -179,7 +180,7 @@ abstract class FlinkMiniCluster(val userConfiguration: Configuration,
   }
 
   def waitForTaskManagersToBeRegistered(): Unit = {
-    implicit val executionContext = AkkaUtils.globalExecutionContext
+    implicit val executionContext = ExecutionContext.global
 
     val futures = taskManagerActors map {
       taskManager => (taskManager ? NotifyWhenRegisteredAtJobManager)(timeout)
@@ -196,8 +197,11 @@ abstract class FlinkMiniCluster(val userConfiguration: Configuration,
   }
   
   @throws(classOf[JobExecutionException])
-  def submitJobAndWait(jobGraph: JobGraph, printUpdates: Boolean, timeout: FiniteDuration)
-                                                                 : SerializedJobExecutionResult = {
+  def submitJobAndWait(
+      jobGraph: JobGraph,
+      printUpdates: Boolean,
+      timeout: FiniteDuration)
+    : SerializedJobExecutionResult = {
 
     val clientActorSystem = if (singleActorSystem) jobManagerActorSystem
     else JobClient.startJobClientActorSystem(configuration)

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 520decd..44a0b04 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -65,6 +65,7 @@ import org.apache.flink.runtime.util.{ZooKeeperUtil, MathUtils, EnvironmentInfor
 
 import scala.concurrent._
 import scala.concurrent.duration._
+import scala.concurrent.forkjoin.ForkJoinPool
 import scala.util.{Failure, Success}
 import scala.collection.JavaConverters._
 
@@ -1363,14 +1364,16 @@ object TaskManager {
   @throws(classOf[IllegalConfigurationException])
   @throws(classOf[IOException])
   @throws(classOf[Exception])
-  def startTaskManagerComponentsAndActor(configuration: Configuration,
-                                         actorSystem: ActorSystem,
-                                         taskManagerHostname: String,
-                                         taskManagerActorName: Option[String],
-                                         jobManagerPath: Option[String],
-                                         localTaskManagerCommunication: Boolean,
-                                         streamingMode: StreamingMode,
-                                         taskManagerClass: Class[_ <: TaskManager]): ActorRef = {
+  def startTaskManagerComponentsAndActor(
+      configuration: Configuration,
+      actorSystem: ActorSystem,
+      taskManagerHostname: String,
+      taskManagerActorName: Option[String],
+      jobManagerPath: Option[String],
+      localTaskManagerCommunication: Boolean,
+      streamingMode: StreamingMode,
+      taskManagerClass: Class[_ <: TaskManager])
+    : ActorRef = {
 
     // get and check the JobManager config
     val jobManagerAkkaUrl: String = jobManagerPath.getOrElse {
@@ -1380,17 +1383,20 @@ object TaskManager {
     }
 
     val (taskManagerConfig : TaskManagerConfiguration,
-         netConfig: NetworkEnvironmentConfiguration,
-         connectionInfo: InstanceConnectionInfo)
-
-         = parseTaskManagerConfiguration(configuration, taskManagerHostname,
-                                         localTaskManagerCommunication)
+      netConfig: NetworkEnvironmentConfiguration,
+      connectionInfo: InstanceConnectionInfo
+    ) = parseTaskManagerConfiguration(
+      configuration,
+      taskManagerHostname,
+      localTaskManagerCommunication)
 
     // pre-start checks
     checkTempDirs(taskManagerConfig.tmpDirPaths)
 
+    val executionContext = ExecutionContext.fromExecutor(new ForkJoinPool())
+
     // we start the network first, to make sure it can allocate its buffers first
-    val network = new NetworkEnvironment(taskManagerConfig.timeout, netConfig)
+    val network = new NetworkEnvironment(executionContext, taskManagerConfig.timeout, netConfig)
 
     // computing the amount of memory to use depends on how much memory is available
     // it strictly needs to happen AFTER the network stack has been initialized

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
index 693e014..1e66d81 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
 
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -43,6 +44,7 @@ public class AllVerticesIteratorTest {
 			v4.setParallelism(2);
 			
 			ExecutionGraph eg = Mockito.mock(ExecutionGraph.class);
+			Mockito.when(eg.getExecutionContext()).thenReturn(TestingUtils.directExecutionContext());
 					
 			ExecutionJobVertex ejv1 = new ExecutionJobVertex(eg, v1, 1,
 					AkkaUtils.getDefaultTimeout());

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
index a4bd03c..a47ea77 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
@@ -28,6 +28,7 @@ import static org.mockito.Mockito.when;
 
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.junit.Test;
 import org.mockito.Matchers;
 
@@ -99,7 +100,12 @@ public class ExecutionGraphConstructionTest {
 		
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
 
-		ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout());
+		ExecutionGraph eg = new ExecutionGraph(
+				TestingUtils.defaultExecutionContext(),
+				jobId,
+				jobName,
+				cfg,
+				AkkaUtils.getDefaultTimeout());
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -137,7 +143,12 @@ public class ExecutionGraphConstructionTest {
 		
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3));
 
-		ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout());
+		ExecutionGraph eg = new ExecutionGraph(
+				TestingUtils.defaultExecutionContext(),
+				jobId,
+				jobName,
+				cfg,
+				AkkaUtils.getDefaultTimeout());
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -198,7 +209,12 @@ public class ExecutionGraphConstructionTest {
 		
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3));
 
-		ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout());
+		ExecutionGraph eg = new ExecutionGraph(
+				TestingUtils.defaultExecutionContext(),
+				jobId,
+				jobName,
+				cfg,
+				AkkaUtils.getDefaultTimeout());
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -446,7 +462,12 @@ public class ExecutionGraphConstructionTest {
 		
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1));
 
-		ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout());
+		ExecutionGraph eg = new ExecutionGraph(
+				TestingUtils.defaultExecutionContext(),
+				jobId,
+				jobName,
+				cfg,
+				AkkaUtils.getDefaultTimeout());
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -496,7 +517,12 @@ public class ExecutionGraphConstructionTest {
 		
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v5, v4));
 
-		ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout());
+		ExecutionGraph eg = new ExecutionGraph(
+				TestingUtils.defaultExecutionContext(),
+				jobId,
+				jobName,
+				cfg,
+				AkkaUtils.getDefaultTimeout());
 		try {
 			eg.attachJobGraph(ordered);
 			fail("Attached wrong jobgraph");
@@ -551,7 +577,11 @@ public class ExecutionGraphConstructionTest {
 			
 			List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
 
-			ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg,
+			ExecutionGraph eg = new ExecutionGraph(
+					TestingUtils.defaultExecutionContext(),
+					jobId,
+					jobName,
+					cfg,
 					AkkaUtils.getDefaultTimeout());
 			try {
 				eg.attachJobGraph(ordered);
@@ -591,7 +621,11 @@ public class ExecutionGraphConstructionTest {
 			
 			List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3));
 
-			ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg,
+			ExecutionGraph eg = new ExecutionGraph(
+					TestingUtils.defaultExecutionContext(),
+					jobId,
+					jobName,
+					cfg,
 					AkkaUtils.getDefaultTimeout());
 
 			try {
@@ -657,7 +691,11 @@ public class ExecutionGraphConstructionTest {
 			
 			JobGraph jg = new JobGraph(jobId, jobName, v1, v2, v3, v4, v5, v6, v7, v8);
 			
-			ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg,
+			ExecutionGraph eg = new ExecutionGraph(
+					TestingUtils.defaultExecutionContext(),
+					jobId,
+					jobName,
+					cfg,
 					AkkaUtils.getDefaultTimeout());
 			eg.attachJobGraph(jg.getVerticesSortedTopologicallyFromSources());
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 03a41b4..cff7146 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -30,12 +30,6 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 
-import akka.actor.Actor;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.testkit.JavaTestKit;
-import akka.testkit.TestActorRef;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -51,29 +45,13 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.operators.RegularPactTask;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class ExecutionGraphDeploymentTest {
 
-	private static ActorSystem system;
-
-	@BeforeClass
-	public static void setup() {
-		system = ActorSystem.create("TestingActorSystem", TestingUtils.testConfig());
-	}
-
-	@AfterClass
-	public static void teardown() {
-		JavaTestKit.shutdownActorSystem(system);
-		system = null;
-	}
-
 	@Test
 	public void testBuildDeploymentDescriptor() {
 		try {
-			TestingUtils.setCallingThreadDispatcher(system);
 			final JobID jobId = new JobID();
 
 			final JobVertexID jid1 = new JobVertexID();
@@ -100,7 +78,11 @@ public class ExecutionGraphDeploymentTest {
 			v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL);
 			v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL);
 
-			ExecutionGraph eg = new ExecutionGraph(jobId, "some job", new Configuration(),
+			ExecutionGraph eg = new ExecutionGraph(
+					TestingUtils.defaultExecutionContext(),
+					jobId,
+					"some job",
+					new Configuration(),
 					AkkaUtils.getDefaultTimeout());
 
 			List<JobVertex> ordered = Arrays.asList(v1, v2, v3, v4);
@@ -110,15 +92,9 @@ public class ExecutionGraphDeploymentTest {
 			ExecutionJobVertex ejv = eg.getAllVertices().get(jid2);
 			ExecutionVertex vertex = ejv.getTaskVertices()[3];
 
-			// create synchronous task manager
-			final TestActorRef<? extends Actor> simpleTaskManager = TestActorRef.create(system,
-					Props.create(ExecutionGraphTestUtils
-							.SimpleAcknowledgingTaskManager.class));
-
-			ExecutionGraphTestUtils.SimpleAcknowledgingTaskManager tm = (ExecutionGraphTestUtils
-					.SimpleAcknowledgingTaskManager) simpleTaskManager.underlyingActor();
+			ExecutionGraphTestUtils.SimpleInstanceGateway instanceGateway = new ExecutionGraphTestUtils.SimpleInstanceGateway(TestingUtils.directExecutionContext());
 
-			final Instance instance = getInstance(simpleTaskManager);
+			final Instance instance = getInstance(instanceGateway);
 
 			final SimpleSlot slot = instance.allocateSimpleSlot(jobId);
 
@@ -128,7 +104,7 @@ public class ExecutionGraphDeploymentTest {
 
 			assertEquals(ExecutionState.DEPLOYING, vertex.getExecutionState());
 
-			TaskDeploymentDescriptor descr = tm.lastTDD;
+			TaskDeploymentDescriptor descr = instanceGateway.lastTDD;
 			assertNotNull(descr);
 
 			assertEquals(jobId, descr.getJobID());
@@ -152,9 +128,6 @@ public class ExecutionGraphDeploymentTest {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
-		finally {
-			TestingUtils.setGlobalExecutionContext();
-		}
 	}
 
 	@Test
@@ -307,19 +280,23 @@ public class ExecutionGraphDeploymentTest {
 		v2.setInvokableClass(RegularPactTask.class);
 
 		// execution graph that executes actions synchronously
-		ExecutionGraph eg = new ExecutionGraph(jobId, "some job", new Configuration(),
+		ExecutionGraph eg = new ExecutionGraph(
+				TestingUtils.directExecutionContext(),
+				jobId,
+				"some job",
+				new Configuration(),
 				AkkaUtils.getDefaultTimeout());
 		eg.setQueuedSchedulingAllowed(false);
 
 		List<JobVertex> ordered = Arrays.asList(v1, v2);
 		eg.attachJobGraph(ordered);
 
-		// create a mock taskmanager that accepts deployment calls
-		ActorRef tm = system.actorOf(Props.create(ExecutionGraphTestUtils.SimpleAcknowledgingTaskManager.class));
-
-		Scheduler scheduler = new Scheduler();
+		Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
 		for (int i = 0; i < dop1 + dop2; i++) {
-			scheduler.newInstanceAvailable(ExecutionGraphTestUtils.getInstance(tm));
+			scheduler.newInstanceAvailable(
+					ExecutionGraphTestUtils.getInstance(
+							new ExecutionGraphTestUtils.SimpleInstanceGateway(
+									TestingUtils.directExecutionContext())));
 		}
 		assertEquals(dop1 + dop2, scheduler.getNumberOfAvailableSlots());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index a77a09e..8a63060 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -26,17 +26,16 @@ import java.lang.reflect.Field;
 import java.net.InetAddress;
 import java.util.LinkedList;
 
-import akka.actor.ActorRef;
-import akka.actor.Status;
-import akka.actor.UntypedActor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.instance.BaseTestingInstanceGateway;
 import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.instance.InstanceGateway;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -49,9 +48,11 @@ import org.apache.flink.runtime.messages.TaskMessages.SubmitTask;
 import org.apache.flink.runtime.messages.TaskMessages.FailIntermediateResultPartitions;
 import org.apache.flink.runtime.messages.TaskMessages.CancelTask;
 import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.mockito.Matchers;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
+import scala.concurrent.ExecutionContext;
 
 public class ExecutionGraphTestUtils {
 
@@ -100,103 +101,95 @@ public class ExecutionGraphTestUtils {
 	//  utility mocking methods
 	// --------------------------------------------------------------------------------------------
 
-	public static Instance getInstance(final ActorRef taskManager) throws
-			Exception {
-		return getInstance(taskManager, 1);
+	public static Instance getInstance(final InstanceGateway gateway) throws Exception {
+		return getInstance(gateway, 1);
 	}
 
-	public static Instance getInstance(final ActorRef taskManager, final int numberOfSlots) throws Exception {
+	public static Instance getInstance(final InstanceGateway gateway, final int numberOfSlots) throws Exception {
 		HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024);
 		InetAddress address = InetAddress.getByName("127.0.0.1");
 		InstanceConnectionInfo connection = new InstanceConnectionInfo(address, 10001);
-		
-		return new Instance(taskManager, connection, new InstanceID(), hardwareDescription, numberOfSlots);
+
+		return new Instance(gateway, connection, new InstanceID(), hardwareDescription, numberOfSlots);
 	}
 
-	public static class SimpleAcknowledgingTaskManager extends UntypedActor {
+	public static class SimpleInstanceGateway extends BaseTestingInstanceGateway {
 		public TaskDeploymentDescriptor lastTDD;
+
+		public SimpleInstanceGateway(ExecutionContext executionContext){
+			super(executionContext);
+		}
+
 		@Override
-		public void onReceive(Object msg) throws Exception {
-			if (msg instanceof SubmitTask) {
-				SubmitTask submitTask = (SubmitTask) msg;
+		public Object handleMessage(Object message) {
+			Object result = null;
+			if(message instanceof SubmitTask) {
+				SubmitTask submitTask = (SubmitTask) message;
 				lastTDD = submitTask.tasks();
 
-				getSender().tell(Messages.getAcknowledge(), getSelf());
-			} else if (msg instanceof CancelTask) {
-				CancelTask cancelTask = (CancelTask) msg;
-				getSender().tell(new TaskOperationResult(cancelTask.attemptID(), true), getSelf());
-			}
-			else if (msg instanceof FailIntermediateResultPartitions) {
-				getSender().tell(new Object(), getSelf());
+				result = Messages.getAcknowledge();
+			} else if(message instanceof CancelTask) {
+				CancelTask cancelTask = (CancelTask) message;
+
+				result = new TaskOperationResult(cancelTask.attemptID(), true);
+			} else if(message instanceof FailIntermediateResultPartitions) {
+				result = new Object();
 			}
+
+			return result;
 		}
 	}
 
-	public static final String ERROR_MESSAGE = "test_failure_error_message";
+	public static class SimpleFailingInstanceGateway extends BaseTestingInstanceGateway {
+		public SimpleFailingInstanceGateway(ExecutionContext executionContext) {
+			super(executionContext);
+		}
 
-	public static class SimpleFailingTaskManager extends UntypedActor {
 		@Override
-		public void onReceive(Object msg) throws Exception {
-			if (msg instanceof SubmitTask) {
-				getSender().tell(new Status.Failure(new Exception(ERROR_MESSAGE)),	getSelf());
-			} else if (msg instanceof CancelTask) {
-				CancelTask cancelTask = (CancelTask) msg;
-				getSender().tell(new TaskOperationResult(cancelTask.attemptID(), true), getSelf());
+		public Object handleMessage(Object message) throws Exception {
+			if(message instanceof SubmitTask) {
+				throw new Exception(ERROR_MESSAGE);
+			} else if (message instanceof CancelTask) {
+				CancelTask cancelTask = (CancelTask) message;
+
+				return new TaskOperationResult(cancelTask.attemptID(), true);
+			} else {
+				return null;
 			}
 		}
 	}
-	
-	public static ExecutionJobVertex getExecutionVertex(JobVertexID id) throws JobException {
+
+	public static final String ERROR_MESSAGE = "test_failure_error_message";
+
+	public static ExecutionJobVertex getExecutionVertex(JobVertexID id, ExecutionContext executionContext) throws JobException {
 		JobVertex ajv = new JobVertex("TestVertex", id);
 		ajv.setInvokableClass(mock(AbstractInvokable.class).getClass());
-		
-		ExecutionGraph graph = new ExecutionGraph(new JobID(), "test job", new Configuration(),
+
+		ExecutionGraph graph = new ExecutionGraph(
+				executionContext,
+				new JobID(),
+				"test job",
+				new Configuration(),
 				AkkaUtils.getDefaultTimeout());
-		
+
 		ExecutionJobVertex ejv = spy(new ExecutionJobVertex(graph, ajv, 1,
 				AkkaUtils.getDefaultTimeout()));
-		
+
 		Answer<Void> noop = new Answer<Void>() {
 			@Override
 			public Void answer(InvocationOnMock invocation) {
 				return null;
 			}
 		};
-		
+
 		doAnswer(noop).when(ejv).vertexCancelled(Matchers.anyInt());
 		doAnswer(noop).when(ejv).vertexFailed(Matchers.anyInt(), Matchers.any(Throwable.class));
 		doAnswer(noop).when(ejv).vertexFinished(Matchers.anyInt());
-		
+
 		return ejv;
 	}
 	
-	// --------------------------------------------------------------------------------------------
-	
-	public static final class ActionQueue {
-		
-		private final LinkedList<Runnable> runnables = new LinkedList<Runnable>();
-		
-		public void triggerNextAction() {
-			Runnable r = runnables.remove();
-			r.run();
-		}
-
-		public void triggerLatestAction(){
-			Runnable r = runnables.removeLast();
-			r.run();
-		}
-		
-		public Runnable popNextAction() {
-			Runnable r = runnables.remove();
-			return r;
-		}
-
-		public void queueAction(Runnable r) {
-			this.runnables.add(r);
-		}
-
-		public boolean isEmpty(){
-			return runnables.isEmpty();
-		}
+	public static ExecutionJobVertex getExecutionVertex(JobVertexID id) throws JobException {
+		return getExecutionVertex(id, TestingUtils.defaultExecutionContext());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
index 7787ab4..f47e92c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
@@ -24,10 +24,6 @@ import static org.mockito.Mockito.mock;
 
 import java.util.Arrays;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.testkit.JavaTestKit;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.instance.SimpleSlot;
@@ -37,24 +33,10 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class ExecutionStateProgressTest {
 
-	private static ActorSystem system;
-
-	@BeforeClass
-	public static void setup(){
-		system = ActorSystem.create("TestingActorSystem", TestingUtils.testConfig());
-	}
-
-	@AfterClass
-	public static void teardown(){
-		JavaTestKit.shutdownActorSystem(system);
-	}
-
 	@Test
 	public void testAccumulatedStateFinished() {
 		try {
@@ -65,8 +47,13 @@ public class ExecutionStateProgressTest {
 			ajv.setParallelism(3);
 			ajv.setInvokableClass(mock(AbstractInvokable.class).getClass());
 
-			ExecutionGraph graph = new ExecutionGraph(jid, "test job", new Configuration(),
+			ExecutionGraph graph = new ExecutionGraph(
+					TestingUtils.defaultExecutionContext(),
+					jid,
+					"test job",
+					new Configuration(),
 					AkkaUtils.getDefaultTimeout());
+
 			graph.attachJobGraph(Arrays.asList(ajv));
 
 			setGraphStatus(graph, JobStatus.RUNNING);
@@ -74,9 +61,11 @@ public class ExecutionStateProgressTest {
 			ExecutionJobVertex ejv = graph.getJobVertex(vid);
 
 			// mock resources and mock taskmanager
-			ActorRef taskManager = system.actorOf(Props.create(SimpleAcknowledgingTaskManager.class));
 			for (ExecutionVertex ee : ejv.getTaskVertices()) {
-				SimpleSlot slot = getInstance(taskManager).allocateSimpleSlot(jid);
+				SimpleSlot slot = getInstance(
+						new SimpleInstanceGateway(
+								TestingUtils.defaultExecutionContext())
+				).allocateSimpleSlot(jid);
 				ee.deployToSlot(slot);
 			}
 


Mime
View raw message