flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject flink git commit: [FLINK-4456] Replace Akka specific types by interfaces in Task
Date Thu, 08 Sep 2016 15:23:59 GMT
Repository: flink
Updated Branches:
  refs/heads/master 02b852e35 -> 0735b5b93


[FLINK-4456] Replace Akka specific types by interfaces in Task

Introduce TaskExecutionStateListener for Task

Replace JobManagerGateway in Task by InputSplitProvider and CheckpointNotifier

Replace the TaskManager ActorGateway by TaskManagerConnection in Task

Rename taskmanager.CheckpointNotifier into CheckpointResponder; rename TaskExecutionStateListener.notifyTaskExecutionState into notifyTaskExecutionStateChanged

Remove InputSplitProvider.start; add ClassLoader parameter to InputSplitProvider.getNextInputSplit

Removes the unused class InputSplitIterator.

Update InputSplitProvider JavaDocs

This closes #2456.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0735b5b9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0735b5b9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0735b5b9

Branch: refs/heads/master
Commit: 0735b5b935b0c0757943e2d58047afcfb9949560
Parents: 02b852e
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Thu Sep 1 14:41:44 2016 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Thu Sep 8 17:20:08 2016 +0200

----------------------------------------------------------------------
 .../ExecutionPlanAfterExecutionTest.java        |   5 +-
 .../jobgraph/tasks/InputSplitIterator.java      |  88 ----------------
 .../jobgraph/tasks/InputSplitProvider.java      |   5 +-
 .../flink/runtime/operators/DataSourceTask.java |   2 +-
 .../ActorGatewayCheckpointResponder.java        |  78 ++++++++++++++
 .../ActorGatewayTaskExecutionStateListener.java |  42 ++++++++
 .../ActorGatewayTaskManagerConnection.java      |  59 +++++++++++
 .../taskmanager/CheckpointResponder.java        |  63 +++++++++++
 .../runtime/taskmanager/RuntimeEnvironment.java |  21 ++--
 .../apache/flink/runtime/taskmanager/Task.java  | 105 ++++++++-----------
 .../taskmanager/TaskExecutionStateListener.java |  29 +++++
 .../taskmanager/TaskInputSplitProvider.java     |  55 +++++-----
 .../taskmanager/TaskManagerConnection.java      |  57 ++++++++++
 .../flink/runtime/taskmanager/TaskManager.scala |  17 ++-
 .../testutils/MockInputSplitProvider.java       |   3 +-
 .../runtime/taskmanager/TaskAsyncCallTest.java  |  13 +--
 .../taskmanager/TaskInputSplitProviderTest.java |  16 +--
 .../flink/runtime/taskmanager/TaskStopTest.java |   8 +-
 .../flink/runtime/taskmanager/TaskTest.java     |  60 +++++++----
 .../source/InputFormatSourceFunction.java       |   2 +-
 .../source/InputFormatSourceFunctionTest.java   |   5 +-
 .../tasks/InterruptSensitiveRestoreTest.java    |  13 ++-
 .../streaming/runtime/tasks/StreamTaskTest.java |  75 ++++---------
 23 files changed, 519 insertions(+), 302 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0735b5b9/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java
index 2bffba9..4ec0e47 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java
@@ -25,12 +25,15 @@ import org.apache.flink.api.java.LocalEnvironment;
 import org.apache.flink.api.java.aggregation.Aggregations;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
+import java.io.Serializable;
+
 import static org.junit.Assert.fail;
 
 @SuppressWarnings("serial")
-public class ExecutionPlanAfterExecutionTest implements java.io.Serializable {
+public class ExecutionPlanAfterExecutionTest extends TestLogger implements Serializable {
 
 	@Test
 	public void testExecuteAfterGetExecutionPlan() {

http://git-wip-us.apache.org/repos/asf/flink/blob/0735b5b9/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitIterator.java
deleted file mode 100644
index a3d700a..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitIterator.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.jobgraph.tasks;
-
-import java.util.Iterator;
-
-import org.apache.flink.core.io.InputSplit;
-
-/**
- * The input split iterator allows a task to iterate over all input splits it is supposed to
- * consume. Internally, the input split iterator calls an {@link InputSplitProvider} on each <code>next</code> call in
- * order to facilitate lazy split assignment.
- * 
- * @param <T>
- */
-public class InputSplitIterator<T extends InputSplit> implements Iterator<T> {
-
-	/**
-	 * The {@link InputSplitProvider} that is called to provide new input splits.
-	 */
-	private final InputSplitProvider inputSplitProvider;
-
-	/**
-	 * Buffers the next input split to be returned by this iterator or <code>null</code> it no split is buffered.
-	 */
-	private T nextInputSplit = null;
-
-	/**
-	 * Constructs a new input split iterator.
-	 * 
-	 * @param inputSplitProvider
-	 *        the input split provider to be called for new input splits
-	 */
-	public InputSplitIterator(final InputSplitProvider inputSplitProvider) {
-		this.inputSplitProvider = inputSplitProvider;
-	}
-
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public boolean hasNext() {
-
-		if (this.nextInputSplit == null) {
-			this.nextInputSplit = (T) inputSplitProvider.getNextInputSplit();
-		}
-
-		return this.nextInputSplit != null;
-	}
-
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public T next() {
-		T retVal = null;
-
-		if (this.nextInputSplit == null) {
-			this.nextInputSplit = (T) inputSplitProvider.getNextInputSplit();
-		}
-
-		retVal = this.nextInputSplit;
-		this.nextInputSplit = null;
-
-		return retVal;
-	}
-
-
-	@Override
-	public void remove() {
-		throw new RuntimeException("The InputSplitIterator does not implement the remove method");
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0735b5b9/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitProvider.java
index 5e7a40f..e0cde17 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitProvider.java
@@ -30,9 +30,10 @@ public interface InputSplitProvider {
 
 	/**
 	 * Requests the next input split to be consumed by the calling task.
-	 * 
+	 *
+	 * @param userCodeClassLoader used to deserialize input splits
 	 * @return the next input split to be consumed by the calling task or <code>null</code> if the
 	 *         task shall not consume any further input splits.
 	 */
-	InputSplit getNextInputSplit();
+	InputSplit getNextInputSplit(ClassLoader userCodeClassLoader);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0735b5b9/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
index 68e29b6..c062bf8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
@@ -333,7 +333,7 @@ public class DataSourceTask<OT> extends AbstractInvokable {
 					return true;
 				}
 				
-				InputSplit split = provider.getNextInputSplit();
+				InputSplit split = provider.getNextInputSplit(getUserCodeClassLoader());
 				
 				if (split != null) {
 					this.nextSplit = split;

http://git-wip-us.apache.org/repos/asf/flink/blob/0735b5b9/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java
new file mode 100644
index 0000000..56e5922
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.Preconditions;
+
+import java.util.List;
+
+/**
+ * Implementation using {@link ActorGateway} to forward the messages.
+ */
+public class ActorGatewayCheckpointResponder implements CheckpointResponder {
+
+	private final ActorGateway actorGateway;
+
+	public ActorGatewayCheckpointResponder(ActorGateway actorGateway) {
+		this.actorGateway = Preconditions.checkNotNull(actorGateway);
+	}
+
+	@Override
+	public void acknowledgeCheckpoint(
+		JobID jobID,
+		ExecutionAttemptID executionAttemptID,
+		long checkpointID,
+		ChainedStateHandle<StreamStateHandle> chainedStateHandle,
+		List<KeyGroupsStateHandle> keyGroupStateHandles) {
+
+		AcknowledgeCheckpoint message = new AcknowledgeCheckpoint(
+			jobID,
+			executionAttemptID,
+			checkpointID,
+			chainedStateHandle,
+			keyGroupStateHandles);
+
+		actorGateway.tell(message);
+	}
+
+	@Override
+	public void declineCheckpoint(
+		JobID jobID,
+		ExecutionAttemptID executionAttemptID,
+		long checkpointID,
+		long checkpointTimestamp) {
+
+		DeclineCheckpoint decline = new DeclineCheckpoint(
+			jobID,
+			executionAttemptID,
+			checkpointID,
+			checkpointTimestamp);
+
+		actorGateway.tell(decline);
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0735b5b9/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskExecutionStateListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskExecutionStateListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskExecutionStateListener.java
new file mode 100644
index 0000000..d729dbb
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskExecutionStateListener.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.messages.TaskMessages;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Implementation using {@link ActorGateway} to forward the messages.
+ */
+public class ActorGatewayTaskExecutionStateListener implements TaskExecutionStateListener {
+
+	private final ActorGateway actorGateway;
+
+	public ActorGatewayTaskExecutionStateListener(ActorGateway actorGateway) {
+		this.actorGateway = Preconditions.checkNotNull(actorGateway);
+	}
+
+	@Override
+	public void notifyTaskExecutionStateChanged(TaskExecutionState taskExecutionState) {
+		TaskMessages.UpdateTaskExecutionState actorMessage = new TaskMessages.UpdateTaskExecutionState(taskExecutionState);
+
+		actorGateway.tell(actorMessage);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0735b5b9/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskManagerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskManagerConnection.java
new file mode 100644
index 0000000..cddac55
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskManagerConnection.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.messages.TaskManagerMessages;
+import org.apache.flink.runtime.messages.TaskMessages;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Implementation using {@link ActorGateway} to forward the messages.
+ */
+public class ActorGatewayTaskManagerConnection implements TaskManagerConnection {
+
+	private final ActorGateway actorGateway;
+
+	public ActorGatewayTaskManagerConnection(ActorGateway actorGateway) {
+		this.actorGateway = Preconditions.checkNotNull(actorGateway);
+	}
+
+	@Override
+	public void notifyFinalState(ExecutionAttemptID executionAttemptID) {
+		actorGateway.tell(new TaskMessages.TaskInFinalState(executionAttemptID));
+	}
+
+	@Override
+	public void notifyFatalError(String message, Throwable cause) {
+		actorGateway.tell(new TaskManagerMessages.FatalError(message, cause));
+	}
+
+	@Override
+	public void failTask(ExecutionAttemptID executionAttemptID, Throwable cause) {
+		actorGateway.tell(new TaskMessages.FailTask(executionAttemptID, cause));
+	}
+
+	@Override
+	public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {
+		TaskMessages.UpdateTaskExecutionState actorMessage = new TaskMessages.UpdateTaskExecutionState(taskExecutionState);
+
+		actorGateway.tell(actorMessage);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0735b5b9/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java
new file mode 100644
index 0000000..9d5c4e1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.util.List;
+
+/**
+ * Responder for checkpoint acknowledge and decline messages in the {@link Task}.
+ */
+public interface CheckpointResponder {
+
+	/**
+	 * Acknowledges the given checkpoint.
+	 *
+	 * @param jobID Job ID of the running job
+	 * @param executionAttemptID Execution attempt ID of the running task
+	 * @param checkpointID Checkpoint ID of the checkpoint
+	 * @param chainedStateHandle Chained state handle
+	 * @param keyGroupStateHandles State handles for key groups
+	 */
+	void acknowledgeCheckpoint(
+		JobID jobID,
+		ExecutionAttemptID executionAttemptID,
+		long checkpointID,
+		ChainedStateHandle<StreamStateHandle> chainedStateHandle,
+		List<KeyGroupsStateHandle> keyGroupStateHandles);
+
+	/**
+	 * Declines the given checkpoint.
+	 *
+	 * @param jobID Job ID of the running job
+	 * @param executionAttemptID Execution attempt ID of the running task
+	 * @param checkpointID Checkpoint ID of the checkpoint
+	 * @param checkpointTimestamp Timestamp of the checkpoint
+	 */
+	void declineCheckpoint(
+		JobID jobID,
+		ExecutionAttemptID executionAttemptID,
+		long checkpointID,
+		long checkpointTimestamp);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0735b5b9/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
index d54826a..3e4ba4d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
@@ -27,14 +27,12 @@ import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.ChainedStateHandle;
@@ -74,7 +72,7 @@ public class RuntimeEnvironment implements Environment {
 	private final ResultPartitionWriter[] writers;
 	private final InputGate[] inputGates;
 	
-	private final ActorGateway jobManager;
+	private final CheckpointResponder checkpointResponder;
 
 	private final AccumulatorRegistry accumulatorRegistry;
 
@@ -105,7 +103,7 @@ public class RuntimeEnvironment implements Environment {
 			Map<String, Future<Path>> distCacheEntries,
 			ResultPartitionWriter[] writers,
 			InputGate[] inputGates,
-			ActorGateway jobManager,
+			CheckpointResponder checkpointResponder,
 			TaskManagerRuntimeInfo taskManagerInfo,
 			TaskMetricGroup metrics,
 			Task containingTask) {
@@ -127,7 +125,7 @@ public class RuntimeEnvironment implements Environment {
 		this.distCacheEntries = checkNotNull(distCacheEntries);
 		this.writers = checkNotNull(writers);
 		this.inputGates = checkNotNull(inputGates);
-		this.jobManager = checkNotNull(jobManager);
+		this.checkpointResponder = checkNotNull(checkpointResponder);
 		this.taskManagerInfo = checkNotNull(taskManagerInfo);
 		this.containingTask = containingTask;
 		this.metrics = metrics;
@@ -251,14 +249,13 @@ public class RuntimeEnvironment implements Environment {
 			ChainedStateHandle<StreamStateHandle> chainedStateHandle,
 			List<KeyGroupsStateHandle> keyGroupStateHandles) {
 
-		AcknowledgeCheckpoint message = new AcknowledgeCheckpoint(
-				jobId,
-				executionId,
-				checkpointId,
-				chainedStateHandle,
-				keyGroupStateHandles);
 
-		jobManager.tell(message);
+		checkpointResponder.acknowledgeCheckpoint(
+			jobId,
+			executionId,
+			checkpointId,
+			chainedStateHandle,
+			keyGroupStateHandles);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0735b5b9/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index d09e03c..9994b7d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
@@ -40,7 +41,6 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.filecache.FileCache;
-import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
@@ -54,11 +54,6 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.jobgraph.tasks.StoppableTask;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.messages.TaskManagerMessages.FatalError;
-import org.apache.flink.runtime.messages.TaskMessages.FailTask;
-import org.apache.flink.runtime.messages.TaskMessages.TaskInFinalState;
-import org.apache.flink.runtime.messages.TaskMessages.UpdateTaskExecutionState;
-import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
@@ -68,8 +63,6 @@ import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import scala.concurrent.duration.FiniteDuration;
-
 import java.io.IOException;
 import java.net.URL;
 import java.util.HashMap;
@@ -174,17 +167,17 @@ public class Task implements Runnable {
 
 	private final Map<IntermediateDataSetID, SingleInputGate> inputGatesById;
 
-	/** Gateway to the TaskManager that spawned this task */
-	private final ActorGateway taskManager;
+	/** Connection to the task manager */
+	private final TaskManagerConnection taskManagerConnection;
 
-	/** Gateway to the JobManager */
-	private final ActorGateway jobManager;
+	/** Input split provider for the task */
+	private final InputSplitProvider inputSplitProvider;
 
-	/** All actors that want to be notified about changes in the task's execution state */
-	private final List<ActorGateway> executionListenerActors;
+	/** Checkpoint notifier used to communicate with the CheckpointCoordinator */
+	private final CheckpointResponder checkpointResponder;
 
-	/** The timeout for all ask operations on actors */
-	private final FiniteDuration actorAskTimeout;
+	/** All listener that want to be notified about changes in the task's execution state */
+	private final List<TaskExecutionStateListener> taskExecutionStateListeners;
 
 	/** The library cache, from which the task can request its required JAR files */
 	private final LibraryCacheManager libraryCache;
@@ -244,20 +237,21 @@ public class Task implements Runnable {
 	 * <p><b>IMPORTANT:</b> This constructor may not start any work that would need to
 	 * be undone in the case of a failing task deployment.</p>
 	 */
-	public Task(TaskDeploymentDescriptor tdd,
-				MemoryManager memManager,
-				IOManager ioManager,
-				NetworkEnvironment networkEnvironment,
-				JobManagerCommunicationFactory jobManagerCommunicationFactory,
-				BroadcastVariableManager bcVarManager,
-				ActorGateway taskManagerActor,
-				ActorGateway jobManagerActor,
-				FiniteDuration actorAskTimeout,
-				LibraryCacheManager libraryCache,
-				FileCache fileCache,
-				TaskManagerRuntimeInfo taskManagerConfig,
-				TaskMetricGroup metricGroup)
-	{
+	public Task(
+		TaskDeploymentDescriptor tdd,
+		MemoryManager memManager,
+		IOManager ioManager,
+		NetworkEnvironment networkEnvironment,
+		JobManagerCommunicationFactory jobManagerCommunicationFactory,
+		BroadcastVariableManager bcVarManager,
+		TaskManagerConnection taskManagerConnection,
+		InputSplitProvider inputSplitProvider,
+		CheckpointResponder checkpointResponder,
+		LibraryCacheManager libraryCache,
+		FileCache fileCache,
+		TaskManagerRuntimeInfo taskManagerConfig,
+		TaskMetricGroup metricGroup) {
+
 		this.taskInfo = checkNotNull(tdd.getTaskInfo());
 		this.jobId = checkNotNull(tdd.getJobID());
 		this.vertexId = checkNotNull(tdd.getVertexID());
@@ -281,16 +275,16 @@ public class Task implements Runnable {
 		this.broadcastVariableManager = checkNotNull(bcVarManager);
 		this.accumulatorRegistry = new AccumulatorRegistry(jobId, executionId);
 
-		this.jobManager = checkNotNull(jobManagerActor);
-		this.taskManager = checkNotNull(taskManagerActor);
-		this.actorAskTimeout = checkNotNull(actorAskTimeout);
+		this.inputSplitProvider = checkNotNull(inputSplitProvider);
+		this.checkpointResponder = checkNotNull(checkpointResponder);
+		this.taskManagerConnection = checkNotNull(taskManagerConnection);
 
 		this.libraryCache = checkNotNull(libraryCache);
 		this.fileCache = checkNotNull(fileCache);
 		this.network = checkNotNull(networkEnvironment);
 		this.taskManagerConfig = checkNotNull(taskManagerConfig);
 
-		this.executionListenerActors = new CopyOnWriteArrayList<ActorGateway>();
+		this.taskExecutionStateListeners = new CopyOnWriteArrayList<>();
 		this.metrics = metricGroup;
 
 		// create the reader and writer structures
@@ -539,19 +533,16 @@ public class Task implements Runnable {
 			//  call the user code initialization methods
 			// ----------------------------------------------------------------
 
-			TaskInputSplitProvider splitProvider = new TaskInputSplitProvider(jobManager,
-					jobId, vertexId, executionId, userCodeClassLoader, actorAskTimeout);
-
 			TaskKvStateRegistry kvStateRegistry = network
 					.createKvStateTaskRegistry(jobId, getJobVertexId());
 
-			Environment env = new RuntimeEnvironment(jobId, vertexId, executionId,
-					executionConfig, taskInfo, jobConfiguration, taskConfiguration,
-					userCodeClassLoader, memoryManager, ioManager,
-					broadcastVariableManager, accumulatorRegistry,
-					kvStateRegistry,
-					splitProvider, distributedCacheEntries,
-					writers, inputGates, jobManager, taskManagerConfig, metrics, this);
+			Environment env = new RuntimeEnvironment(
+				jobId, vertexId, executionId, executionConfig, taskInfo,
+				jobConfiguration, taskConfiguration, userCodeClassLoader,
+				memoryManager, ioManager, broadcastVariableManager,
+				accumulatorRegistry, kvStateRegistry, inputSplitProvider,
+				distributedCacheEntries, writers, inputGates,
+				checkpointResponder, taskManagerConfig, metrics, this);
 
 			// let the task code create its readers and writers
 			invokable.setEnvironment(env);
@@ -588,11 +579,9 @@ public class Task implements Runnable {
 				throw new CancelTaskException();
 			}
 
-			// notify everyone that we switched to running. especially the TaskManager needs
-			// to know this!
+			// notify everyone that we switched to running
 			notifyObservers(ExecutionState.RUNNING, null);
-			taskManager.tell(new UpdateTaskExecutionState(
-					new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING)));
+			taskManagerConnection.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));
 
 			// make sure the user code classloader is accessible thread-locally
 			executingThread.setContextClassLoader(userCodeClassLoader);
@@ -785,11 +774,11 @@ public class Task implements Runnable {
 	}
 
 	private void notifyFinalState() {
-		taskManager.tell(new TaskInFinalState(executionId));
+		taskManagerConnection.notifyFinalState(executionId);
 	}
 
 	private void notifyFatalError(String message, Throwable cause) {
-		taskManager.tell(new FatalError(message, cause));
+		taskManagerConnection.notifyFatalError(message, cause);
 	}
 
 	// ----------------------------------------------------------------------------------------------------------------
@@ -815,7 +804,7 @@ public class Task implements Runnable {
 						((StoppableTask)Task.this.invokable).stop();
 					} catch(RuntimeException e) {
 						LOG.error("Stopping task " + taskNameWithSubtask + " failed.", e);
-						taskManager.tell(new FailTask(executionId, e));
+						taskManagerConnection.failTask(executionId, e);
 					}
 				}
 			};
@@ -910,8 +899,8 @@ public class Task implements Runnable {
 	//  State Listeners
 	// ------------------------------------------------------------------------
 
-	public void registerExecutionListener(ActorGateway listener) {
-		executionListenerActors.add(listener);
+	public void registerExecutionListener(TaskExecutionStateListener listener) {
+		taskExecutionStateListeners.add(listener);
 	}
 
 	private void notifyObservers(ExecutionState newState, Throwable error) {
@@ -923,10 +912,9 @@ public class Task implements Runnable {
 		}
 
 		TaskExecutionState stateUpdate = new TaskExecutionState(jobId, executionId, newState, error);
-		UpdateTaskExecutionState actorMessage = new UpdateTaskExecutionState(stateUpdate);
 
-		for (ActorGateway listener : executionListenerActors) {
-			listener.tell(actorMessage);
+		for (TaskExecutionStateListener listener : taskExecutionStateListeners) {
+			listener.notifyTaskExecutionStateChanged(stateUpdate);
 		}
 	}
 
@@ -936,7 +924,7 @@ public class Task implements Runnable {
 
 	/**
 	 * Calls the invokable to trigger a checkpoint, if the invokable implements the interface
-	 * {@link org.apache.flink.runtime.jobgraph.tasks.StatefulTask}.
+	 * {@link StatefulTask}.
 	 * 
 	 * @param checkpointID The ID identifying the checkpoint.
 	 * @param checkpointTimestamp The timestamp associated with the checkpoint.
@@ -957,8 +945,7 @@ public class Task implements Runnable {
 						try {
 							boolean success = statefulTask.triggerCheckpoint(checkpointID, checkpointTimestamp);
 							if (!success) {
-								DeclineCheckpoint decline = new DeclineCheckpoint(jobId, getExecutionId(), checkpointID, checkpointTimestamp);
-								jobManager.tell(decline);
+								checkpointResponder.declineCheckpoint(jobId, getExecutionId(), checkpointID, checkpointTimestamp);
 							}
 						}
 						catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/flink/blob/0735b5b9/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateListener.java
new file mode 100644
index 0000000..9fa9c90
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateListener.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+public interface TaskExecutionStateListener {
+
+	/**
+	 * Called whenever the task's execution state changes
+	 *
+	 * @param taskExecutionState describing the task execution state change
+	 */
+	void notifyTaskExecutionStateChanged(TaskExecutionState taskExecutionState);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0735b5b9/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
index cb78c16..60beae0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
@@ -27,55 +27,55 @@ import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.util.InstantiationUtil;
 
+import org.apache.flink.util.Preconditions;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
+/**
+ * Implementation using {@link ActorGateway} to forward the messages.
+ */
 public class TaskInputSplitProvider implements InputSplitProvider {
 
 	private final ActorGateway jobManager;
 	
-	private final JobID jobId;
+	private final JobID jobID;
 	
-	private final JobVertexID vertexId;
+	private final JobVertexID vertexID;
 
 	private final ExecutionAttemptID executionID;
 
-	private final ClassLoader usercodeClassLoader;
-	
 	private final FiniteDuration timeout;
-	
+
+
 	public TaskInputSplitProvider(
-			ActorGateway jobManager,
-			JobID jobId,
-			JobVertexID vertexId,
-			ExecutionAttemptID executionID,
-			ClassLoader userCodeClassLoader,
-			FiniteDuration timeout)
-	{
-		this.jobManager = jobManager;
-		this.jobId = jobId;
-		this.vertexId = vertexId;
-		this.executionID = executionID;
-		this.usercodeClassLoader = userCodeClassLoader;
-		this.timeout = timeout;
+		ActorGateway jobManager,
+		JobID jobID,
+		JobVertexID vertexID,
+		ExecutionAttemptID executionID,
+		FiniteDuration timeout) {
+
+		this.jobManager = Preconditions.checkNotNull(jobManager);
+		this.jobID = Preconditions.checkNotNull(jobID);
+		this.vertexID = Preconditions.checkNotNull(vertexID);
+		this.executionID = Preconditions.checkNotNull(executionID);
+		this.timeout = Preconditions.checkNotNull(timeout);
 	}
 
 	@Override
-	public InputSplit getNextInputSplit() {
+	public InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) {
+		Preconditions.checkNotNull(userCodeClassLoader);
+
 		try {
 			final Future<Object> response = jobManager.ask(
-					new JobManagerMessages.RequestNextInputSplit(jobId, vertexId, executionID),
+					new JobManagerMessages.RequestNextInputSplit(jobID, vertexID, executionID),
 					timeout);
 
 			final Object result = Await.result(response, timeout);
 
-			if(!(result instanceof JobManagerMessages.NextInputSplit)){
-				throw new RuntimeException("RequestNextInputSplit requires a response of type " +
-						"NextInputSplit. Instead response is of type " + result.getClass() + ".");
-			} else {
+			if(result instanceof JobManagerMessages.NextInputSplit){
 				final JobManagerMessages.NextInputSplit nextInputSplit =
-						(JobManagerMessages.NextInputSplit) result;
+					(JobManagerMessages.NextInputSplit) result;
 
 				byte[] serializedData = nextInputSplit.splitData();
 
@@ -83,9 +83,12 @@ public class TaskInputSplitProvider implements InputSplitProvider {
 					return null;
 				} else {
 					Object deserialized = InstantiationUtil.deserializeObject(serializedData,
-							usercodeClassLoader);
+						userCodeClassLoader);
 					return (InputSplit) deserialized;
 				}
+			} else {
+				throw new Exception("RequestNextInputSplit requires a response of type " +
+					"NextInputSplit. Instead response is of type " + result.getClass() + '.');
 			}
 		} catch (Exception e) {
 			throw new RuntimeException("Requesting the next InputSplit failed.", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/0735b5b9/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerConnection.java
new file mode 100644
index 0000000..dc1b40f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerConnection.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+
+/**
+ * Interface for the communication of the {@link Task} with the {@link TaskManager}.
+ */
+public interface TaskManagerConnection {
+
+	/**
+	 * Notifies the task manager that the given task is in a final state.
+	 *
+	 * @param executionAttemptID Execution attempt ID of the task
+	 */
+	void notifyFinalState(ExecutionAttemptID executionAttemptID);
+
+	/**
+	 * Notifies the task manager about a fatal error occurred in the task.
+	 *
+	 * @param message Message to report
+	 * @param cause Cause of the fatal error
+	 */
+	void notifyFatalError(String message, Throwable cause);
+
+	/**
+	 * Tells the task manager to fail the given task.
+	 *
+	 * @param executionAttemptID Execution attempt ID of the task to fail
+	 * @param cause Cause of the failure
+	 */
+	void failTask(ExecutionAttemptID executionAttemptID, Throwable cause);
+
+	/**
+	 * Notifies the task manager about the task execution state update.
+	 *
+	 * @param taskExecutionState Task execution state update
+	 */
+	void updateTaskExecutionState(TaskExecutionState taskExecutionState);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0735b5b9/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index de85f30..8ebdd80 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1149,6 +1149,17 @@ class TaskManager(
       
       val taskMetricGroup = taskManagerMetricGroup.addTaskForJob(tdd)
 
+      val inputSplitProvider = new TaskInputSplitProvider(
+        jobManagerGateway,
+        tdd.getJobID,
+        tdd.getVertexID,
+        tdd.getExecutionId,
+        config.timeout)
+
+      val checkpointResponder = new ActorGatewayCheckpointResponder(jobManagerGateway);
+
+      val taskManagerConnection = new ActorGatewayTaskManagerConnection(selfGateway)
+
       val task = new Task(
         tdd,
         memoryManager,
@@ -1156,9 +1167,9 @@ class TaskManager(
         network,
         jmFactory,
         bcVarManager,
-        selfGateway,
-        jobManagerGateway,
-        config.timeout,
+        taskManagerConnection,
+        inputSplitProvider,
+        checkpointResponder,
         libCache,
         fileCache,
         runtimeInfo,

http://git-wip-us.apache.org/repos/asf/flink/blob/0735b5b9/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockInputSplitProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockInputSplitProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockInputSplitProvider.java
index 1d405f0..a17484f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockInputSplitProvider.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockInputSplitProvider.java
@@ -84,7 +84,7 @@ public class MockInputSplitProvider implements InputSplitProvider {
 
 
 	@Override
-	public InputSplit getNextInputSplit() {
+	public InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) {
 
 		if (this.nextSplit < this.inputSplits.length) {
 			return this.inputSplits[this.nextSplit++];
@@ -92,5 +92,4 @@ public class MockInputSplitProvider implements InputSplitProvider {
 
 		return null;
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0735b5b9/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 9501c7c..a5f4019 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
@@ -32,8 +33,6 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.filecache.FileCache;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.DummyActorGateway;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
@@ -52,12 +51,9 @@ import org.apache.flink.util.SerializedValue;
 import org.junit.Before;
 import org.junit.Test;
 
-import scala.concurrent.duration.FiniteDuration;
-
 import java.net.URL;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.fail;
@@ -170,16 +166,15 @@ public class TaskAsyncCallTest {
 				Collections.<URL>emptyList(),
 				0);
 
-		ActorGateway taskManagerGateway = DummyActorGateway.INSTANCE;
 		return new Task(tdd,
 			mock(MemoryManager.class),
 			mock(IOManager.class),
 			networkEnvironment,
 			jobManagerCommunicationFactory,
 			mock(BroadcastVariableManager.class),
-			taskManagerGateway,
-			DummyActorGateway.INSTANCE,
-			new FiniteDuration(60, TimeUnit.SECONDS),
+			mock(TaskManagerConnection.class),
+			mock(InputSplitProvider.class),
+			mock(CheckpointResponder.class),
 			libCache,
 			mock(FileCache.class),
 			new TaskManagerRuntimeInfo("localhost", new Configuration(), System.getProperty("java.io.tmpdir")),

http://git-wip-us.apache.org/repos/asf/flink/blob/0735b5b9/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProviderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProviderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProviderTest.java
index 4ccce1d..642300d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProviderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProviderTest.java
@@ -47,22 +47,22 @@ public class TaskInputSplitProviderTest {
 
 
 		final TaskInputSplitProvider provider = new TaskInputSplitProvider(
-				gateway,
-				jobID,
-				vertexID,
-				executionID,
-				getClass().getClassLoader(),
-				timeout
-		);
+			gateway,
+			jobID,
+			vertexID,
+			executionID,
+			timeout);
 
 		// The jobManager will return a
-		InputSplit nextInputSplit = provider.getNextInputSplit();
+		InputSplit nextInputSplit = provider.getNextInputSplit(getClass().getClassLoader());
 
 		assertTrue(nextInputSplit == null);
 	}
 
 	public static class NullInputSplitGateway extends BaseTestingActorGateway {
 
+		private static final long serialVersionUID = -7733997150554492926L;
+
 		public NullInputSplitGateway() {
 			super(TestingUtils.defaultExecutionContext());
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/0735b5b9/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
index cfa7fb6..d041465 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskmanager;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
@@ -27,7 +28,6 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.filecache.FileCache;
-import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -73,9 +73,9 @@ public class TaskStopTest {
 			mock(NetworkEnvironment.class),
 			mock(JobManagerCommunicationFactory.class),
 			mock(BroadcastVariableManager.class),
-			mock(ActorGateway.class),
-			mock(ActorGateway.class),
-			mock(FiniteDuration.class),
+			mock(TaskManagerConnection.class),
+			mock(InputSplitProvider.class),
+			mock(CheckpointResponder.class),
 			mock(LibraryCacheManager.class),
 			mock(FileCache.class),
 			mock(TaskManagerRuntimeInfo.class),

http://git-wip-us.apache.org/repos/asf/flink/blob/0735b5b9/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 9e8f8f8..e5fdf32 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.messages.TaskMessages;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
@@ -93,6 +94,9 @@ public class TaskTest {
 	private ActorGateway jobManagerGateway;
 	private ActorGateway listenerGateway;
 
+	private ActorGatewayTaskExecutionStateListener listener;
+	private ActorGatewayTaskManagerConnection taskManagerConnection;
+
 	private BlockingQueue<Object> taskManagerMessages;
 	private BlockingQueue<Object> jobManagerMessages;
 	private BlockingQueue<Object> listenerMessages;
@@ -105,6 +109,9 @@ public class TaskTest {
 		taskManagerGateway = new ForwardingActorGateway(taskManagerMessages);
 		jobManagerGateway = new ForwardingActorGateway(jobManagerMessages);
 		listenerGateway = new ForwardingActorGateway(listenerMessages);
+
+		listener = new ActorGatewayTaskExecutionStateListener(listenerGateway);
+		taskManagerConnection = new ActorGatewayTaskManagerConnection(taskManagerGateway);
 		
 		awaitLatch = new OneShotLatch();
 		triggerLatch = new OneShotLatch();
@@ -135,7 +142,7 @@ public class TaskTest {
 			assertFalse(task.isCanceledOrFailed());
 			assertNull(task.getFailureCause());
 			
-			task.registerExecutionListener(listenerGateway);
+			task.registerExecutionListener(listener);
 			
 			// go into the run method. we should switch to DEPLOYING, RUNNING, then
 			// FINISHED, and all should be good
@@ -210,7 +217,7 @@ public class TaskTest {
 			assertFalse(task.isCanceledOrFailed());
 			assertNull(task.getFailureCause());
 
-			task.registerExecutionListener(listenerGateway);
+			task.registerExecutionListener(listener);
 
 			// should fail
 			task.run();
@@ -253,7 +260,7 @@ public class TaskTest {
 			
 			Task task = createTask(TestInvokableCorrect.class, libCache, network, jobManagerCommunicationFactory);
 
-			task.registerExecutionListener(listenerGateway);
+			task.registerExecutionListener(listener);
 
 			task.run();
 
@@ -274,7 +281,7 @@ public class TaskTest {
 	public void testInvokableInstantiationFailed() {
 		try {
 			Task task = createTask(InvokableNonInstantiable.class);
-			task.registerExecutionListener(listenerGateway);
+			task.registerExecutionListener(listener);
 
 			task.run();
 
@@ -295,7 +302,7 @@ public class TaskTest {
 	public void testExecutionFailsInInvoke() {
 		try {
 			Task task = createTask(InvokableWithExceptionInInvoke.class);
-			task.registerExecutionListener(listenerGateway);
+			task.registerExecutionListener(listener);
 			
 			task.run();
 
@@ -319,7 +326,7 @@ public class TaskTest {
 	public void testCancelDuringInvoke() {
 		try {
 			Task task = createTask(InvokableBlockingInInvoke.class);
-			task.registerExecutionListener(listenerGateway);
+			task.registerExecutionListener(listener);
 
 			// run the task asynchronous
 			task.startTaskThread();
@@ -353,7 +360,7 @@ public class TaskTest {
 	public void testFailExternallyDuringInvoke() {
 		try {
 			Task task = createTask(InvokableBlockingInInvoke.class);
-			task.registerExecutionListener(listenerGateway);
+			task.registerExecutionListener(listener);
 
 			// run the task asynchronous
 			task.startTaskThread();
@@ -386,7 +393,7 @@ public class TaskTest {
 	public void testCanceledAfterExecutionFailedInInvoke() {
 		try {
 			Task task = createTask(InvokableWithExceptionInInvoke.class);
-			task.registerExecutionListener(listenerGateway);
+			task.registerExecutionListener(listener);
 
 			task.run();
 
@@ -413,7 +420,7 @@ public class TaskTest {
 	public void testExecutionFailesAfterCanceling() {
 		try {
 			Task task = createTask(InvokableWithExceptionOnTrigger.class);
-			task.registerExecutionListener(listenerGateway);
+			task.registerExecutionListener(listener);
 
 			// run the task asynchronous
 			task.startTaskThread();
@@ -450,7 +457,7 @@ public class TaskTest {
 	public void testExecutionFailsAfterTaskMarkedFailed() {
 		try {
 			Task task = createTask(InvokableWithExceptionOnTrigger.class);
-			task.registerExecutionListener(listenerGateway);
+			task.registerExecutionListener(listener);
 
 			// run the task asynchronous
 			task.startTaskThread();
@@ -618,21 +625,30 @@ public class TaskTest {
 		JobManagerCommunicationFactory jobManagerCommunicationFactory) {
 		
 		TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(invokable);
+
+		InputSplitProvider inputSplitProvider = new TaskInputSplitProvider(
+			jobManagerGateway,
+			tdd.getJobID(),
+			tdd.getVertexID(),
+			tdd.getExecutionId(),
+			new FiniteDuration(60, TimeUnit.SECONDS));
+
+		CheckpointResponder checkpointResponder = new ActorGatewayCheckpointResponder(jobManagerGateway);
 		
 		return new Task(
-				tdd,
-				mock(MemoryManager.class),
-				mock(IOManager.class),
-				networkEnvironment,
+			tdd,
+			mock(MemoryManager.class),
+			mock(IOManager.class),
+			networkEnvironment,
 			jobManagerCommunicationFactory,
-				mock(BroadcastVariableManager.class),
-				taskManagerGateway,
-				jobManagerGateway,
-				new FiniteDuration(60, TimeUnit.SECONDS),
-				libCache,
-				mock(FileCache.class),
-				new TaskManagerRuntimeInfo("localhost", new Configuration(), System.getProperty("java.io.tmpdir")),
-				mock(TaskMetricGroup.class));
+			mock(BroadcastVariableManager.class),
+			taskManagerConnection,
+			inputSplitProvider,
+			checkpointResponder,
+			libCache,
+			mock(FileCache.class),
+			new TaskManagerRuntimeInfo("localhost", new Configuration(), System.getProperty("java.io.tmpdir")),
+			mock(TaskMetricGroup.class));
 	}
 
 	private TaskDeploymentDescriptor createTaskDeploymentDescriptor(Class<? extends AbstractInvokable> invokable) {

http://git-wip-us.apache.org/repos/asf/flink/blob/0735b5b9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java
index e3e5c54..343affe 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java
@@ -146,7 +146,7 @@ public class InputFormatSourceFunction<OUT> extends RichParallelSourceFunction<O
 					return true;
 				}
 
-				InputSplit split = provider.getNextInputSplit();
+				InputSplit split = provider.getNextInputSplit(getRuntimeContext().getUserCodeClassLoader());
 
 				if (split != null) {
 					this.nextSplit = split;

http://git-wip-us.apache.org/repos/asf/flink/blob/0735b5b9/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
index a41c7db..d1131b4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
@@ -84,6 +84,7 @@ public class InputFormatSourceFunctionTest {
 
 	private static class LifeCycleTestInputFormat extends RichInputFormat<Integer,InputSplit> {
 
+		private static final long serialVersionUID = 7408902249499583273L;
 		private boolean isConfigured = false;
 		private boolean isInputFormatOpen = false;
 		private boolean isSplitOpen = false;
@@ -128,6 +129,8 @@ public class InputFormatSourceFunctionTest {
 			for (int i = 0; i < minNumSplits; i++) {
 				final int idx = i;
 				splits[idx] = new InputSplit() {
+					private static final long serialVersionUID = -1480792932361908285L;
+
 					@Override
 					public int getSplitNumber() {
 						return idx;
@@ -270,7 +273,7 @@ public class InputFormatSourceFunctionTest {
 
 			return new InputSplitProvider() {
 				@Override
-				public InputSplit getNextInputSplit() {
+				public InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) {
 					if (nextSplit < inputSplits.length) {
 						return inputSplits[nextSplit++];
 					}

http://git-wip-us.apache.org/repos/asf/flink/blob/0735b5b9/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
index e1c9407..47f1bd5 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
@@ -32,10 +32,10 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.librarycache.FallbackLibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.filecache.FileCache;
-import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
@@ -44,7 +44,9 @@ import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.taskmanager.JobManagerCommunicationFactory;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.runtime.taskmanager.TaskManagerConnection;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.streaming.api.TimeCharacteristic;
@@ -56,15 +58,12 @@ import org.apache.flink.util.SerializedValue;
 
 import org.junit.Test;
 
-import scala.concurrent.duration.FiniteDuration;
-
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.Serializable;
 import java.net.URL;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
@@ -153,9 +152,9 @@ public class InterruptSensitiveRestoreTest {
 			networkEnvironment,
 			mock(JobManagerCommunicationFactory.class),
 			mock(BroadcastVariableManager.class),
-			mock(ActorGateway.class),
-			mock(ActorGateway.class),
-			new FiniteDuration(10, TimeUnit.SECONDS),
+				mock(TaskManagerConnection.class),
+				mock(InputSplitProvider.class),
+				mock(CheckpointResponder.class),
 			new FallbackLibraryCacheManager(),
 			new FileCache(new Configuration()),
 			new TaskManagerRuntimeInfo(

http://git-wip-us.apache.org/repos/asf/flink/blob/0735b5b9/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 0a9d2fa..0c79c4e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -41,12 +41,16 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNo
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
-import org.apache.flink.runtime.messages.TaskMessages;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.runtime.taskmanager.JobManagerCommunicationFactory;
 import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskExecutionStateListener;
+import org.apache.flink.runtime.taskmanager.TaskManagerConnection;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -98,12 +102,12 @@ public class StreamTaskTest {
 
 		Task task = createTask(SourceStreamTask.class, cfg);
 
-		ExecutionStateListener executionStateListener = new ExecutionStateListener();
+		TestingExecutionStateListener testingExecutionStateListener = new TestingExecutionStateListener();
 
-		task.registerExecutionListener(executionStateListener);
+		task.registerExecutionListener(testingExecutionStateListener);
 		task.startTaskThread();
 
-		Future<ExecutionState> running = executionStateListener.notifyWhenExecutionState(ExecutionState.RUNNING);
+		Future<ExecutionState> running = testingExecutionStateListener.notifyWhenExecutionState(ExecutionState.RUNNING);
 
 		// wait until the task thread reached state RUNNING
 		ExecutionState executionState = Await.result(running, deadline.timeLeft());
@@ -118,7 +122,7 @@ public class StreamTaskTest {
 		// hit the task before the operator is deserialized
 		task.cancelExecution();
 
-		Future<ExecutionState> canceling = executionStateListener.notifyWhenExecutionState(ExecutionState.CANCELING);
+		Future<ExecutionState> canceling = testingExecutionStateListener.notifyWhenExecutionState(ExecutionState.CANCELING);
 
 		executionState = Await.result(canceling, deadline.timeLeft());
 
@@ -137,9 +141,7 @@ public class StreamTaskTest {
 	//  Test Utilities
 	// ------------------------------------------------------------------------
 
-	private static class ExecutionStateListener implements ActorGateway {
-
-		private static final long serialVersionUID = 8926442805035692182L;
+	private static class TestingExecutionStateListener implements TaskExecutionStateListener {
 
 		ExecutionState executionState = null;
 
@@ -167,56 +169,17 @@ public class StreamTaskTest {
 		}
 
 		@Override
-		public Future<Object> ask(Object message, FiniteDuration timeout) {
-			return null;
-		}
-
-		@Override
-		public void tell(Object message) {
-			this.tell(message, null);
-		}
-
-		@Override
-		public void tell(Object message, ActorGateway sender) {
-			if (message instanceof TaskMessages.UpdateTaskExecutionState) {
-				TaskMessages.UpdateTaskExecutionState updateTaskExecutionState = (TaskMessages.UpdateTaskExecutionState) message;
-
-				synchronized (priorityQueue) {
-					this.executionState = updateTaskExecutionState.taskExecutionState().getExecutionState();
+		public void notifyTaskExecutionStateChanged(TaskExecutionState taskExecutionState) {
+			synchronized (priorityQueue) {
+				this.executionState = taskExecutionState.getExecutionState();
 
-					while (!priorityQueue.isEmpty() && priorityQueue.peek().f0.ordinal() <= this.executionState.ordinal()) {
-						Promise<ExecutionState> promise = priorityQueue.poll().f1;
+				while (!priorityQueue.isEmpty() && priorityQueue.peek().f0.ordinal() <= executionState.ordinal()) {
+					Promise<ExecutionState> promise = priorityQueue.poll().f1;
 
-						promise.success(this.executionState);
-					}
+					promise.success(executionState);
 				}
 			}
 		}
-
-		@Override
-		public void forward(Object message, ActorGateway sender) {
-
-		}
-
-		@Override
-		public Future<Object> retry(Object message, int numberRetries, FiniteDuration timeout, ExecutionContext executionContext) {
-			return null;
-		}
-
-		@Override
-		public String path() {
-			return null;
-		}
-
-		@Override
-		public ActorRef actor() {
-			return null;
-		}
-
-		@Override
-		public UUID leaderSessionID() {
-			return null;
-		}
 	}
 
 	private Task createTask(Class<? extends AbstractInvokable> invokable, StreamConfig taskConfig) throws Exception {
@@ -254,9 +217,9 @@ public class StreamTaskTest {
 			network,
 			jobManagerCommunicationFactory,
 			mock(BroadcastVariableManager.class),
-			new DummyGateway(),
-			new DummyGateway(),
-			new FiniteDuration(60, TimeUnit.SECONDS),
+			mock(TaskManagerConnection.class),
+			mock(InputSplitProvider.class),
+			mock(CheckpointResponder.class),
 			libCache,
 			mock(FileCache.class),
 			new TaskManagerRuntimeInfo("localhost", new Configuration(), System.getProperty("java.io.tmpdir")),


Mime
View raw message