flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [50/82] [abbrv] incubator-flink git commit: Made ExecutionGraph, Execution, ExecutionJobVertex, ExecutionVertex, AllocatedSlot, Instance, CoLocationConstraint, SharedSlot and SlotSharingGroupAssignment serializable. Integrated Kryo to be used to serializ
Date Thu, 18 Dec 2014 18:45:46 GMT
Made ExecutionGraph, Execution, ExecutionJobVertex, ExecutionVertex, AllocatedSlot, Instance, CoLocationConstraint, SharedSlot and SlotSharingGroupAssignment serializable. Integrated Kryo to be used to serialize Akka messages.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/f726e552
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/f726e552
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/f726e552

Branch: refs/heads/master
Commit: f726e552aae7d13032072b240c16bbf16b78bd5c
Parents: 8d414d7
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Wed Dec 3 12:30:17 2014 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Thu Dec 18 18:58:31 2014 +0100

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    |  12 +-
 .../runtime/accumulators/AccumulatorEvent.java  |  35 +--
 .../flink/runtime/executiongraph/Execution.java |   5 +-
 .../runtime/executiongraph/ExecutionGraph.java  |  21 +-
 .../executiongraph/ExecutionJobVertex.java      |  12 +-
 .../runtime/executiongraph/ExecutionVertex.java |   8 +-
 .../flink/runtime/instance/AllocatedSlot.java   |   5 +-
 .../apache/flink/runtime/instance/Instance.java |  13 +-
 .../scheduler/CoLocationConstraint.java         |   6 +-
 .../jobmanager/scheduler/SharedSlot.java        |   5 +-
 .../scheduler/SlotSharingGroupAssignment.java   |   7 +-
 .../runtime/operators/RegularPactTask.java      |   6 +-
 .../taskmanager/TaskInputSplitProvider.java     |   9 +-
 .../apache/flink/runtime/ActorLogMessages.scala |   4 +-
 .../apache/flink/runtime/akka/AkkaUtils.scala   | 223 ++++++++++++++++---
 .../flink/runtime/akka/KryoInitializer.scala    | 199 ++++++++++++++++-
 .../flink/runtime/jobmanager/JobManager.scala   |  49 ++--
 .../runtime/messages/JobmanagerMessages.scala   |   5 +-
 .../runtime/minicluster/FlinkMiniCluster.scala  |  25 ++-
 .../flink/runtime/taskmanager/TaskManager.scala |   5 +-
 .../testingUtils/KryoTestingInitializer.scala   |  46 ++++
 .../runtime/testingUtils/TestingCluster.scala   |   8 +
 .../testingUtils/TestingJobManager.scala        |  12 +-
 .../runtime/testingUtils/TestingUtils.scala     |  31 +++
 .../test/util/ForkableFlinkMiniCluster.scala    |   6 +-
 25 files changed, 636 insertions(+), 121 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f726e552/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index ab1ed78..15b76f7 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -578,21 +578,21 @@ public final class ConfigConstants {
 	
 	// ------------------------------ Akka Values ------------------------------
 
-	public static String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_INTERVAL = "5000 ms";
+	public static String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_INTERVAL = "1000 s";
 
-	public static String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_PAUSE = "100 s";
+	public static String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_PAUSE = "6000 s";
 
 	public static double DEFAULT_AKKA_TRANSPORT_THRESHOLD = 300.0;
 
 	public static String DEFAULT_AKKA_WATCH_HEARTBEAT_INTERVAL = "5000 ms";
 
-	public static String DEFAULT_AKKA_WATCH_HEARTBEAT_PAUSE = "100 s";
+	public static String DEFAULT_AKKA_WATCH_HEARTBEAT_PAUSE = "1 min";
 
-	public static double DEFAULT_AKKA_WATCH_THRESHOLD = 300.0;
+	public static double DEFAULT_AKKA_WATCH_THRESHOLD = 12;
 
-	public static String DEFAULT_AKKA_TCP_TIMEOUT = "15 s";
+	public static String DEFAULT_AKKA_TCP_TIMEOUT = "100 s";
 
-	public static int DEFAULT_AKKA_DISPATCHER_THROUGHPUT = 10;
+	public static int DEFAULT_AKKA_DISPATCHER_THROUGHPUT = 15;
 
 	public static boolean DEFAULT_AKKA_LOG_LIFECYCLE_EVENTS = false;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f726e552/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorEvent.java
index 2a2e81e..761248b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorEvent.java
@@ -146,25 +146,34 @@ public class AccumulatorEvent implements Serializable {
 	}
 
 	private void writeObject(java.io.ObjectOutputStream out) throws IOException{
-		ByteArrayOutputStream baos = new ByteArrayOutputStream();
-		ObjectOutputStream oos = new ObjectOutputStream(baos);
-
 		out.writeObject(jobID);
 
-		oos.writeInt(accumulators.size());
+		byte[] buffer = null;
 
-		for(Map.Entry<String, Accumulator<?, ?>> entry: this.accumulators.entrySet()){
-			oos.writeUTF(entry.getKey());
-			oos.writeUTF(entry.getValue().getClass().getName());
+		if(accumulators != null) {
+			ByteArrayOutputStream baos = new ByteArrayOutputStream();
+			ObjectOutputStream oos = new ObjectOutputStream(baos);
 
-			entry.getValue().write(oos);
-		}
+			oos.writeInt(accumulators.size());
+
+			for (Map.Entry<String, Accumulator<?, ?>> entry : this.accumulators.entrySet()) {
+				oos.writeUTF(entry.getKey());
+				oos.writeUTF(entry.getValue().getClass().getName());
 
-		oos.flush();
-		oos.close();
-		baos.close();
+				entry.getValue().write(oos);
+			}
+
+			oos.flush();
+			oos.close();
+			baos.close();
 
-		byte[] buffer = baos.toByteArray();
+			buffer = baos.toByteArray();
+		}else if(serializedData != null){
+			buffer = serializedData;
+		}else{
+			throw new RuntimeException("The AccumulatorEvent's accumulator is null and there is " +
+					"no serialized data attached to it.");
+		}
 
 		out.writeInt(buffer.length);
 		out.write(buffer);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f726e552/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 2024d0a..9100000 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -27,6 +27,7 @@ import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
 import static org.apache.flink.runtime.execution.ExecutionState.RUNNING;
 import static org.apache.flink.runtime.execution.ExecutionState.SCHEDULED;
 
+import java.io.Serializable;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
@@ -74,7 +75,9 @@ import scala.concurrent.duration.FiniteDuration;
  * occasional double-checking to ensure that the state after a completed call is as expected, and trigger correcting
  * actions if it is not. Many actions are also idempotent (like canceling).
  */
-public class Execution {
+public class Execution implements Serializable {
+
+	static final long serialVersionUID = 42L;
 
 	private static final AtomicReferenceFieldUpdater<Execution, ExecutionState> STATE_UPDATER =
 			AtomicReferenceFieldUpdater.newUpdater(Execution.class, ExecutionState.class, "state");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f726e552/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 7d38eac..dd9fdb1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -60,6 +60,7 @@ import static akka.dispatch.Futures.future;
 
 
 public class ExecutionGraph implements Serializable {
+	static final long serialVersionUID = 42L;
 
 	private static final AtomicReferenceFieldUpdater<ExecutionGraph, JobStatus> STATE_UPDATER =
 			AtomicReferenceFieldUpdater.newUpdater(ExecutionGraph.class, JobStatus.class, "state");
@@ -76,7 +77,7 @@ public class ExecutionGraph implements Serializable {
 	private final String jobName;
 
 	/** The job configuration that was originally attached to the JobGraph. */
-	private final Configuration jobConfiguration;
+	private transient final Configuration jobConfiguration;
 	
 	/** The classloader of the user code. */
 	private final ClassLoader userClassLoader;
@@ -88,22 +89,24 @@ public class ExecutionGraph implements Serializable {
 	private final List<ExecutionJobVertex> verticesInCreationOrder;
 
 	/** All intermediate results that are part of this graph */
-	private final ConcurrentHashMap<IntermediateDataSetID, IntermediateResult> intermediateResults;
+	private transient final ConcurrentHashMap<IntermediateDataSetID, IntermediateResult>
+	intermediateResults;
 	
 	/** The currently executed tasks, for callbacks */
-	private final ConcurrentHashMap<ExecutionAttemptID, Execution> currentExecutions;
+	private transient final ConcurrentHashMap<ExecutionAttemptID, Execution> currentExecutions;
 
-	private final Map<ChannelID, ExecutionEdge> edges = new HashMap<ChannelID, ExecutionEdge>();
+	private transient final Map<ChannelID, ExecutionEdge> edges = new HashMap<ChannelID,
+			ExecutionEdge>();
 	
-	private final List<BlobKey> requiredJarFiles;
+	private transient final List<BlobKey> requiredJarFiles;
 	
-	private final List<ActorRef> jobStatusListenerActors;
+	private transient final List<ActorRef> jobStatusListenerActors;
 
-	private final List<ActorRef> executionListenerActors;
+	private transient final List<ActorRef> executionListenerActors;
 	
 	private final long[] stateTimestamps;
 	
-	private final Object progressLock = new Object();
+	private transient final Object progressLock = new Object();
 	
 	private int nextVertexToFinish;
 	
@@ -116,7 +119,7 @@ public class ExecutionGraph implements Serializable {
 	private volatile Throwable failureCause;
 	
 	
-	private Scheduler scheduler;
+	private transient Scheduler scheduler;
 	
 	private boolean allowQueuedScheduling = true;
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f726e552/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 1ff63ee..97eb407 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.executiongraph;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -39,12 +40,13 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.slf4j.Logger;
 
 
-public class ExecutionJobVertex {
+public class ExecutionJobVertex implements Serializable {
+	static final long serialVersionUID = 42L;
 	
 	/** Use the same log for all ExecutionGraph classes */
 	private static final Logger LOG = ExecutionGraph.LOG;
 	
-	private final Object stateMonitor = new Object();
+	private transient final Object stateMonitor = new Object();
 	
 	private final ExecutionGraph graph;
 	
@@ -52,9 +54,9 @@ public class ExecutionJobVertex {
 	
 	private final ExecutionVertex[] taskVertices;
 
-	private final IntermediateResult[] producedDataSets;
+	private transient final IntermediateResult[] producedDataSets;
 	
-	private final List<IntermediateResult> inputs;
+	private transient final List<IntermediateResult> inputs;
 	
 	private final int parallelism;
 	
@@ -68,7 +70,7 @@ public class ExecutionJobVertex {
 	
 	private final InputSplit[] inputSplits;
 	
-	private InputSplitAssigner splitAssigner;
+	private transient InputSplitAssigner splitAssigner;
 	
 	
 	public ExecutionJobVertex(ExecutionGraph graph, AbstractJobVertex jobVertex, int defaultParallelism) throws JobException {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f726e552/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index a6930f1..0cf944d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -22,6 +22,7 @@ import static org.apache.flink.runtime.execution.ExecutionState.CANCELED;
 import static org.apache.flink.runtime.execution.ExecutionState.FAILED;
 import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
@@ -48,7 +49,8 @@ import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableExceptio
  * The ExecutionVertex is a parallel subtask of the execution. It may be executed once, or several times, each of
  * which time it spawns an {@link Execution}.
  */
-public class ExecutionVertex {
+public class ExecutionVertex implements Serializable {
+	static final long serialVersionUID = 42L;
 
 	@SuppressWarnings("unused")
 	private static final Logger LOG = ExecutionGraph.LOG;
@@ -59,9 +61,9 @@ public class ExecutionVertex {
 	
 	private final ExecutionJobVertex jobVertex;
 	
-	private final IntermediateResultPartition[] resultPartitions;
+	private transient final IntermediateResultPartition[] resultPartitions;
 	
-	private final ExecutionEdge[][] inputEdges;
+	private transient final ExecutionEdge[][] inputEdges;
 	
 	private final int subTaskIndex;
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f726e552/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
index b3ffbb1..f1481f3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.instance;
 
+import java.io.Serializable;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
@@ -28,7 +29,9 @@ import org.apache.flink.runtime.jobmanager.scheduler.Locality;
 /**
  * An allocated slot is the unit in which resources are allocated on instances.
  */
-public class AllocatedSlot {
+public class AllocatedSlot implements Serializable {
+
+	static final long serialVersionUID = 42L;
 	
 	private static final AtomicIntegerFieldUpdater<AllocatedSlot> STATUS_UPDATER = 
 			AtomicIntegerFieldUpdater.newUpdater(AllocatedSlot.class, "status");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f726e552/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
index aaa276d..abbbc34 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.instance;
 
+import java.io.Serializable;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -32,13 +33,15 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotAvailabilityListener;
 /**
  * An taskManager represents a resource a {@link org.apache.flink.runtime.taskmanager.TaskManager} runs on.
  */
-public class Instance {
+public class Instance implements Serializable {
+
+	static final long serialVersionUID = 42L;
 	
 	/** The lock on which to synchronize allocations and failure state changes */
-	private final Object instanceLock = new Object();
+	private transient final Object instanceLock = new Object();
 	
 	/** The actor ref to the task manager represented by this taskManager. */
-	private final ActorRef taskManager;
+	private transient final ActorRef taskManager;
 
 	/** The instance connection information for the data transfer. */
 	private final InstanceConnectionInfo connectionInfo;
@@ -53,14 +56,14 @@ public class Instance {
 	private final int numberOfSlots;
 
 	/** A list of available slot positions */
-	private final Queue<Integer> availableSlots;
+	private transient final Queue<Integer> availableSlots;
 	
 	/** Allocated slots on this taskManager */
 	private final Set<AllocatedSlot> allocatedSlots = new HashSet<AllocatedSlot>();
 
 	
 	/** A listener to be notified upon new slot availability */
-	private SlotAvailabilityListener slotAvailabilityListener;
+	private transient SlotAvailabilityListener slotAvailabilityListener;
 	
 	/**
 	 * Time when last heat beat has been received from the task manager running on this taskManager.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f726e552/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
index 36430de..739ec09 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
@@ -23,7 +23,11 @@ import org.apache.flink.runtime.instance.Instance;
 
 import com.google.common.base.Preconditions;
 
-public class CoLocationConstraint {
+import java.io.Serializable;
+
+public class CoLocationConstraint implements Serializable {
+
+	static final long serialVersionUID = 42L;
 	
 	private final CoLocationGroup group;
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f726e552/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlot.java
index 3673512..1ce8465 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlot.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.jobmanager.scheduler;
 
+import java.io.Serializable;
 import java.util.HashSet;
 import java.util.Set;
 
@@ -30,7 +31,9 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
  *       methods may only be called from within the synchronization scope of
  *       it associated SlotSharingGroupAssignment.
  */
-class SharedSlot {
+class SharedSlot implements Serializable {
+
+	static final long serialVersionUID = 42L;
 
 	private final AllocatedSlot allocatedSlot;
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f726e552/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroupAssignment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroupAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroupAssignment.java
index a07dd6c..7a0546f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroupAssignment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroupAssignment.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.jobmanager.scheduler;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -38,11 +39,13 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.slf4j.Logger;
 
 
-public class SlotSharingGroupAssignment {
+public class SlotSharingGroupAssignment implements Serializable {
+
+	static final long serialVersionUID = 42L;
 	
 	private static final Logger LOG = Scheduler.LOG;
 	
-	private final Object lock = new Object();
+	private transient final Object lock = new Object();
 	
 	/** All slots currently allocated to this sharing group */
 	private final Set<SharedSlot> allSlots = new LinkedHashSet<SharedSlot>();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f726e552/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
index 9ea4a74..74bc4cb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
@@ -584,9 +584,9 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 		}
 
 		// Report accumulators to JobManager
-		env.getAccumulator().tell(new JobManagerMessages.ReportAccumulatorResult(new
-				AccumulatorEvent(env.getJobID(), AccumulatorHelper.copy(accumulators))),
-				ActorRef.noSender());
+		JobManagerMessages.ReportAccumulatorResult accResult = new JobManagerMessages.ReportAccumulatorResult(new
+				AccumulatorEvent(env.getJobID(), AccumulatorHelper.copy(accumulators)));
+		env.getAccumulator().tell(accResult, ActorRef.noSender());
 
 		// We also clear the accumulators, since stub instances might be reused
 		// (e.g. in iterations) and we don't want to count twice. This may not be

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f726e552/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
index 9853ded..3303c72 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.taskmanager;
 import akka.actor.ActorRef;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
@@ -36,13 +37,16 @@ public class TaskInputSplitProvider implements InputSplitProvider {
 	
 	private final JobVertexID vertexId;
 
+	private final ExecutionAttemptID executionID;
+
 	private final FiniteDuration timeout;
 	
 	public TaskInputSplitProvider(ActorRef jobManager, JobID jobId, JobVertexID vertexId,
-								FiniteDuration timeout) {
+								ExecutionAttemptID executionID, FiniteDuration timeout) {
 		this.jobManager = jobManager;
 		this.jobId = jobId;
 		this.vertexId = vertexId;
+		this.executionID = executionID;
 		this.timeout = timeout;
 	}
 
@@ -50,7 +54,8 @@ public class TaskInputSplitProvider implements InputSplitProvider {
 	public InputSplit getNextInputSplit() {
 		try {
 			TaskManagerMessages.NextInputSplit nextInputSplit = AkkaUtils.ask(jobManager,
-					new JobManagerMessages.RequestNextInputSplit(jobId, vertexId), timeout);
+					new JobManagerMessages.RequestNextInputSplit(jobId, vertexId, executionID),
+					timeout);
 
 			return nextInputSplit.inputSplit();
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f726e552/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala
index 9afc060..b39c11d 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala
@@ -30,11 +30,11 @@ trait ActorLogMessages {
     override def isDefinedAt(x: Any): Boolean = _receiveWithLogMessages.isDefinedAt(x)
 
     override def apply(x: Any):Unit = {
-      log.debug(s"Received message $x from ${self.sender}.")
+      log.debug("Received message {} from {}.", x, self.sender)
       val start = System.nanoTime()
       _receiveWithLogMessages(x)
       val duration = (System.nanoTime() - start) / 1000000
-      log.debug(s"Handled message $x in $duration ms from ${self.sender}.")
+      log.debug(s"Handled message {} in {} ms from {}.", x, duration, self.sender)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f726e552/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index 168dccb..0c5405e 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -23,7 +23,6 @@ import java.util.concurrent.Callable
 
 import akka.actor.{ActorSelection, ActorRef, ActorSystem}
 import akka.pattern.{Patterns, ask => akkaAsk}
-import akka.util.Timeout
 import com.typesafe.config.{Config, ConfigFactory}
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import scala.concurrent.{ExecutionContext, Future, Await}
@@ -81,19 +80,17 @@ object AkkaUtils {
     val configString =
       s"""
          |akka {
-         |  loglevel = "$logLevel"
-         |  stdout-loglevel = "$logLevel"
+         |  loglevel = $logLevel
+         |  stdout-loglevel = $logLevel
          |
          |  log-dead-letters = $logLifecycleEvents
          |  log-dead-letters-during-shutdown = $logLifecycleEvents
          |
-         |  extensions = ["com.romix.akka.serialization.kryo.KryoSerializationExtension$$"]
-         |
          |  remote {
          |    transport-failure-detector{
          |      acceptable-heartbeat-pause = $transportHeartbeatPause
-         |      threshold = $transportThreshold
          |      heartbeat-interval = $transportHeartbeatInterval
+         |      threshold = $transportThreshold
          |    }
          |
          |    watch-failure-detector{
@@ -107,41 +104,19 @@ object AkkaUtils {
          |        hostname = $host
          |        port = $port
          |        connection-timeout = $akkaTCPTimeout
-         |        maximum-frame-size = $akkaFramesize
+         |        maximum-frame-size = ${akkaFramesize}
          |      }
          |    }
          |
          |    log-remote-lifecycle-events = $logLifecycleEvents
-         |
          |  }
          |
          |  actor{
          |    default-dispatcher{
-         |      throughput = $akkaThroughput
-         |    }
-         |
-         |    kryo{
-         |      type = "nograph"
-         |      idstrategy = "default"
-         |      serializer-pool-size = 16
-         |      buffer-size = 4096
-         |      max-buffer-size = -1
-         |      use-manifests = false
-         |      compression = off
-         |      implicit-registration-logging = true
-         |      kryo-trace = true
-         |      kryo-custom-serializer-init = "org.apache.flink.runtime.akka.KryoInitializer"
-         |    }
-         |
-         |    serialize-messages = on
-         |
-         |    serializers{
-         |      kryo = "com.romix.akka.serialization.kryo.KryoSerializer"
-         |    }
-         |
-         |    serialization-bindings {
+         |      throughput = ${akkaThroughput}
          |    }
          |  }
+         |
          |}
        """.stripMargin
 
@@ -149,7 +124,7 @@ object AkkaUtils {
   }
 
   def getDefaultActorSystemConfigString: String = {
-    s"""
+    """
        |akka {
        |  daemonic = on
        |
@@ -158,7 +133,8 @@ object AkkaUtils {
        |  logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
        |  stdout-loglevel = "WARNING"
        |  jvm-exit-on-fatal-error = off
-       |  log-config-on-start = off
+       |  log-config-on-start = on
+       |  serialize-messages = on
        |
        |  actor {
        |    provider = "akka.remote.RemoteActorRefProvider"
@@ -167,18 +143,195 @@ object AkkaUtils {
        |  remote{
        |    netty{
        |      tcp{
+       |        port = 0
        |        transport-class = "akka.remote.transport.netty.NettyTransport"
        |        tcp-nodelay = on
-       |
-       |        port = 0
        |        maximum-frame-size = 1MB
+       |        execution-pool-size = 4
        |      }
        |    }
        |  }
        |}
      """.stripMargin
+    }
+
+  // scalastyle:off line.size.limit
+
+  def getKryoSerializerString: String = {
+    """
+      |akka {
+      |
+      |  extensions = ["com.romix.akka.serialization.kryo.KryoSerializationExtension$"]
+      |
+      |  actor{
+      |    kryo{
+      |      type = "graph"
+      |      idstrategy = "incremental"
+      |      serializer-pool-size = 16
+      |      buffer-size = 4096
+      |      max-buffer-size = -1
+      |      use-manifests = false
+      |      compression = off
+      |      implicit-registration-logging = true
+      |      kryo-trace = false
+      |      kryo-custom-serializer-init = "org.apache.flink.runtime.akka.KryoInitializer"
+      |    }
+      |
+      |
+      |    serializers{
+      |      kryo = "com.romix.akka.serialization.kryo.KryoSerializer"
+      |      java = "akka.serialization.JavaSerializer"
+      |    }
+      |
+      |    serialization-bindings {
+      |      "java.io.Serializable" = java
+      |
+      |      "java.lang.Throwable" = java
+      |      "akka.event.Logging$Error" = java
+      |      "java.lang.Integer" = kryo
+      |      "java.lang.Long" = kryo
+      |      "java.lang.Float" = kryo
+      |      "java.lang.Double" = kryo
+      |      "java.lang.Boolean" = kryo
+      |      "java.lang.Short" = kryo
+      |
+      |      "scala.Tuple2" = kryo
+      |      "scala.Tuple3" = kryo
+      |      "scala.Tuple4" = kryo
+      |      "scala.Tuple5" = kryo
+      |      "scala.Tuple6" = kryo
+      |      "scala.Tuple7" = kryo
+      |      "scala.Tuple8" = kryo
+      |      "scala.Tuple9" = kryo
+      |      "scala.Tuple10" = kryo
+      |      "scala.Tuple11" = kryo
+      |      "scala.Tuple12" = kryo
+      |      "scala.collection.BitSet" = kryo
+      |      "scala.collection.SortedSet" = kryo
+      |      "scala.util.Left" = kryo
+      |      "scala.util.Right" = kryo
+      |      "scala.collection.SortedMap" = kryo
+      |      "scala.Int" = kryo
+      |      "scala.Long" = kryo
+      |      "scala.Float" = kryo
+      |      "scala.Double" = kryo
+      |      "scala.Boolean" = kryo
+      |      "scala.Short" = kryo
+      |      "java.lang.String" = kryo
+      |      "scala.Option" = kryo
+      |      "scala.collection.immutable.Map" = kryo
+      |      "scala.collection.Traversable" = kryo
+      |      "scala.runtime.BoxedUnit" = kryo
+      |
+      |      "akka.actor.SystemGuardian$RegisterTerminationHook$" = kryo
+      |      "akka.actor.Address" = kryo
+      |      "akka.actor.Terminated" = kryo
+      |      "akka.actor.LocalActorRef" = kryo
+      |      "akka.actor.RepointableActorRef" = kryo
+      |      "akka.actor.Identify" = kryo
+      |      "akka.actor.ActorIdentity" = kryo
+      |      "akka.actor.PoisonPill$" = kryo
+      |      "akka.actor.SystemGuardian$TerminationHook$" = kryo
+      |      "akka.actor.SystemGuardian$TerminationHookDone$" = kryo
+      |      "akka.actor.AddressTerminated" = kryo
+      |      "akka.actor.Status$Failure" = kryo
+      |      "akka.remote.RemoteWatcher$ReapUnreachableTick$" = kryo
+      |      "akka.remote.RemoteWatcher$HeartbeatTick$" = kryo
+      |      "akka.remote.ReliableDeliverySupervisor$GotUid" = kryo
+      |      "akka.remote.EndpointWriter$AckIdleCheckTimer$" = kryo
+      |      "akka.remote.EndpointWriter$StoppedReading" = kryo
+      |      "akka.remote.ReliableDeliverySupervisor$Ungate$" = kryo
+      |      "akka.remote.EndpointWriter$StopReading" = kryo
+      |      "akka.remote.EndpointWriter$OutboundAck" = kryo
+      |      "akka.remote.Ack" = kryo
+      |      "akka.remote.SeqNo" = kryo
+      |      "akka.remote.EndpointWriter$FlushAndStop$" = kryo
+      |      "akka.remote.ReliableDeliverySupervisor$AttemptSysMsgRedelivery$" = kryo
+      |      "akka.remote.RemoteWatcher$WatchRemote" = kryo
+      |      "akka.remote.RemoteWatcher$UnwatchRemote" = kryo
+      |      "akka.remote.RemoteWatcher$RewatchRemote" = kryo
+      |      "akka.remote.RemoteWatcher$Rewatch" = kryo
+      |      "akka.remote.RemoteWatcher$Heartbeat$" = kryo
+      |      "akka.remote.RemoteWatcher$HeartbeatRsp" = kryo
+      |      "akka.remote.EndpointWriter$FlushAndStopTimeout$" = kryo
+      |      "akka.remote.RemoteWatcher$ExpectedFirstHeartbeat" = kryo
+      |      "akka.remote.transport.Transport$InvalidAssociationException" = kryo
+      |      "akka.dispatch.sysmsg.Terminate" = kryo
+      |      "akka.dispatch.sysmsg.Unwatch" = kryo
+      |      "akka.dispatch.sysmsg.Watch" = kryo
+      |      "akka.dispatch.sysmsg.DeathWatchNotification" = kryo
+      |
+      |      "org.apache.flink.runtime.messages.ArchiveMessages$ArchiveExecutionGraph" = kryo
+      |      "org.apache.flink.runtime.messages.ArchiveMessages$ArchivedJobs" = kryo
+      |
+      |      "org.apache.flink.runtime.messages.ExecutionGraphMessages$ExecutionStateChanged" = kryo
+      |      "org.apache.flink.runtime.messages.ExecutionGraphMessages$JobStatusChanged" = kryo
+      |
+      |      "org.apache.flink.runtime.messages.JobClientMessages$SubmitJobAndWait" = kryo
+      |      "org.apache.flink.runtime.messages.JobClientMessages$SubmitJobDetached" = kryo
+      |
+      |      "org.apache.flink.runtime.messages.JobManagerMessages$SubmitJob" = kryo
+      |      "org.apache.flink.runtime.messages.JobManagerMessages$SubmissionSuccess" = kryo
+      |      "org.apache.flink.runtime.messages.JobManagerMessages$SubmissionFailure" = kryo
+      |      "org.apache.flink.runtime.messages.JobManagerMessages$CancellationSuccess" = kryo
+      |      "org.apache.flink.runtime.messages.JobManagerMessages$CancellationFailure" = kryo
+      |      "org.apache.flink.runtime.messages.JobManagerMessages$CancelJob" = kryo
+      |      "org.apache.flink.runtime.messages.JobManagerMessages$UpdateTaskExecutionState" = kryo
+      |      "org.apache.flink.runtime.messages.JobManagerMessages$RequestNextInputSplit" = kryo
+      |      "org.apache.flink.runtime.messages.JobManagerMessages$LookupConnectionInformation" = kryo
+      |      "org.apache.flink.runtime.messages.JobManagerMessages$ConnectionInformation" = kryo
+      |      "org.apache.flink.runtime.messages.JobManagerMessages$ReportAccumulatorResult" = kryo
+      |      "org.apache.flink.runtime.messages.JobManagerMessages$RequestAccumulatorResults" = kryo
+      |      "org.apache.flink.runtime.messages.JobManagerMessages$AccumulatorResultsFound" = kryo
+      |      "org.apache.flink.runtime.messages.JobManagerMessages$AccumulatorResultsNotFound" = kryo
+      |      "org.apache.flink.runtime.messages.JobManagerMessages$RequestJobStatus" = kryo
+      |      "org.apache.flink.runtime.messages.JobManagerMessages$CurrentJobStatus" = kryo
+      |      "org.apache.flink.runtime.messages.JobManagerMessages$RequestNumberRegisteredTaskManager$" = kryo
+      |      "org.apache.flink.runtime.messages.JobManagerMessages$RequestTotalNumberOfSlots$" = kryo
+      |      "org.apache.flink.runtime.messages.JobManagerMessages$RequestBlobManagerPort$" = kryo
+      |      "org.apache.flink.runtime.messages.JobManagerMessages$RequestFinalJobStatus" = kryo
+      |      "org.apache.flink.runtime.messages.JobManagerMessages$JobResultSuccess" = kryo
+      |      "org.apache.flink.runtime.messages.JobManagerMessages$JobResultCanceled" = kryo
+      |      "org.apache.flink.runtime.messages.JobManagerMessages$JobResultFailed" = kryo
+      |      "org.apache.flink.runtime.messages.JobManagerMessages$RequestRunningJobs$" = kryo
+      |      "org.apache.flink.runtime.messages.JobManagerMessages$RunningJobs" = kryo
+      |      "org.apache.flink.runtime.messages.JobManagerMessages$RequestJob" = kryo
+      |      "org.apache.flink.runtime.messages.JobManagerMessages$JobFound" = kryo
+      |      "org.apache.flink.runtime.messages.JobManagerMessages$JobNotFound" = kryo
+      |      "org.apache.flink.runtime.messages.JobManagerMessages$RequestRegisteredTaskManagers$" = kryo
+      |      "org.apache.flink.runtime.messages.JobManagerMessages$RegisteredTaskManagers" = kryo
+      |
+      |      "org.apache.flink.runtime.messages.JobManagerProfilerMessages$ReportProfilingData" = kryo
+      |
+      |      "org.apache.flink.runtime.messages.TaskManagerMessages$NotifyWhenRegisteredAtJobManager$" = kryo
+      |      "org.apache.flink.runtime.messages.TaskManagerMessages$RegisterAtJobManager$" = kryo
+      |      "org.apache.flink.runtime.messages.TaskManagerMessages$CancelTask" = kryo
+      |      "org.apache.flink.runtime.messages.TaskManagerMessages$SubmitTask" = kryo
+      |      "org.apache.flink.runtime.messages.TaskManagerMessages$NextInputSplit" = kryo
+      |      "org.apache.flink.runtime.messages.TaskManagerMessages$UnregisterTask" = kryo
+      |      "org.apache.flink.runtime.messages.TaskManagerMessages$TaskOperationResult" = kryo
+      |      "org.apache.flink.runtime.messages.TaskManagerMessages$Heartbeat" = kryo
+      |      "org.apache.flink.runtime.messages.TaskManagerMessages$RegisteredAtJobManager$" = kryo
+      |      "org.apache.flink.runtime.messages.TaskManagerMessages$RegisterAtJobManager$" = kryo
+      |      "org.apache.flink.runtime.messages.TaskManagerMessages$SendHeartbeat$" = kryo
+      |      "org.apache.flink.runtime.messages.TaskManagerMessages$LogMemoryUsage$" = kryo
+      |
+      |      "org.apache.flink.runtime.messages.TaskManagerProfilerMessages$MonitorTask" = kryo
+      |      "org.apache.flink.runtime.messages.TaskManagerProfilerMessages$UnmonitorTask" = kryo
+      |      "org.apache.flink.runtime.messages.TaskManagerProfilerMessages$RegisterProfilingListener$" = kryo
+      |      "org.apache.flink.runtime.messages.TaskManagerProfilerMessages$UnregisterProfilingListener$" = kryo
+      |      "org.apache.flink.runtime.messages.TaskManagerProfilerMessages$ProfileTasks$" = kryo
+      |
+      |      "org.apache.flink.runtime.messages.RegistrationMessages$RegisterTaskManager" = kryo
+      |      "org.apache.flink.runtime.messages.RegistrationMessages$AcknowledgeRegistration" = kryo
+      |    }
+      |  }
+      |}
+    """.stripMargin
   }
 
+  // scalastyle:on line.size.limit
+
   def getDefaultActorSystemConfig = {
     ConfigFactory.parseString(getDefaultActorSystemConfigString)
   }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f726e552/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/KryoInitializer.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/KryoInitializer.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/KryoInitializer.scala
index 5f9854b..2a492fd 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/KryoInitializer.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/KryoInitializer.scala
@@ -18,9 +18,204 @@
 
 package org.apache.flink.runtime.akka
 
-import com.esotericsoftware.kryo.Kryo
+import java.net.Inet4Address
+
+import com.esotericsoftware.kryo.serializers.JavaSerializer
+import com.esotericsoftware.kryo.{Serializer, Kryo}
+import org.apache.flink.api.common.accumulators.Accumulator
+import org.apache.flink.core.fs.FileInputSplit
+import org.apache.flink.core.io.{LocatableInputSplit, GenericInputSplit}
+import org.apache.flink.runtime.accumulators.AccumulatorEvent
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor
+import org.apache.flink.runtime.execution.ExecutionState
+import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph}
+import org.apache.flink.runtime.instance.{Instance, InstanceID, HardwareDescription,
+InstanceConnectionInfo}
+import org.apache.flink.runtime.io.network.{RemoteReceiver, ConnectionInfoLookupResponse}
+import org.apache.flink.runtime.io.network.channels.ChannelID
+import org.apache.flink.runtime.jobgraph.{JobVertexID, JobStatus, JobID, JobGraph}
+import org.apache.flink.runtime.messages.ArchiveMessages.{ArchivedJobs, RequestArchivedJobs,
+ArchiveExecutionGraph}
+import org.apache.flink.runtime.messages.ExecutionGraphMessages.{ExecutionStateChanged,
+JobStatusChanged}
+import org.apache.flink.runtime.messages.JobClientMessages.{SubmitJobDetached, SubmitJobAndWait}
+import org.apache.flink.runtime.messages.JobManagerMessages._
+import org.apache.flink.runtime.messages.JobManagerProfilerMessages.ReportProfilingData
+import org.apache.flink.runtime.messages.RegistrationMessages.{AcknowledgeRegistration,
+RegisterTaskManager}
+import org.apache.flink.runtime.messages.TaskManagerMessages._
+import org.apache.flink.runtime.messages.TaskManagerProfilerMessages._
+import org.apache.flink.runtime.profiling.impl.types.ProfilingDataContainer
+import org.apache.flink.runtime.taskmanager.{Task, TaskExecutionState}
 
 class KryoInitializer {
-  def cystomize(kryo: Kryo): Unit = {
+  def customize(kryo: Kryo): Unit = {
+
+    register(kryo)
+  }
+
+  def register(kryo: Kryo): Unit = {
+    def register(className: String): Unit = {
+      kryo.register(Class.forName(className))
+    }
+
+    def registerClass(clazz: Class[_], serializer: Serializer[_] = null): Unit = {
+      if(serializer != null){
+        kryo.register(clazz, serializer)
+      }else {
+        kryo.register(clazz)
+      }
+    }
+
+    register("scala.Some")
+    register("scala.None$")
+    register("scala.collection.immutable.Set$EmptySet$")
+    register("scala.runtime.BoxedUnit")
+
+    register("akka.actor.SystemGuardian$RegisterTerminationHook$")
+    register("akka.actor.Address")
+    register("akka.actor.Terminated")
+    register("akka.actor.LocalActorRef")
+    register("akka.actor.RepointableActorRef")
+    register("akka.actor.Identify")
+    register("akka.actor.ActorIdentity")
+    register("akka.actor.PoisonPill$")
+    register("akka.actor.AddressTerminated")
+    register("akka.actor.Status$Failure")
+    register("akka.remote.RemoteWatcher$ReapUnreachableTick$")
+    register("akka.remote.RemoteWatcher$HeartbeatTick$")
+    register("akka.remote.ReliableDeliverySupervisor$GotUid")
+    register("akka.remote.EndpointWriter$AckIdleCheckTimer$")
+    register("akka.remote.EndpointWriter$StoppedReading")
+    register("akka.remote.ReliableDeliverySupervisor$Ungate$")
+    register("akka.remote.EndpointWriter$StopReading")
+    register("akka.remote.EndpointWriter$OutboundAck")
+    register("akka.remote.Ack")
+    register("akka.remote.SeqNo")
+    register("akka.remote.RemoteWatcher$HeartbeatRsp")
+    register("akka.actor.SystemGuardian$TerminationHook$")
+    register("akka.actor.SystemGuardian$TerminationHookDone$")
+    register("akka.remote.EndpointWriter$FlushAndStop$")
+    register("akka.remote.RemoteWatcher$WatchRemote")
+    register("akka.remote.RemoteWatcher$UnwatchRemote")
+    register("akka.remote.RemoteWatcher$Rewatch")
+    register("akka.remote.RemoteWatcher$RewatchRemote")
+    register("akka.remote.ReliableDeliverySupervisor$AttemptSysMsgRedelivery$")
+    register("akka.remote.RemoteActorRef")
+    register("akka.remote.RemoteWatcher$Heartbeat$")
+    register("akka.remote.EndpointWriter$FlushAndStopTimeout$")
+    register("akka.remote.RemoteWatcher$ExpectedFirstHeartbeat")
+    register("akka.remote.transport.Transport$InvalidAssociationException")
+    register("akka.remote.transport.AkkaProtocolException")
+    register("akka.dispatch.sysmsg.Terminate")
+    register("akka.dispatch.sysmsg.Unwatch")
+    register("akka.dispatch.sysmsg.Watch")
+    register("akka.dispatch.sysmsg.DeathWatchNotification")
+
+//    register("java.util.Collections$UnmodifiableRandomAccessList")
+
+
+    //Register Flink messages
+
+    kryo.setDefaultSerializer(classOf[JavaSerializer])
+
+    //misc types
+    registerClass(classOf[JobID])
+    registerClass(classOf[JobVertexID])
+    registerClass(classOf[ExecutionAttemptID])
+    registerClass(classOf[InstanceID])
+    registerClass(classOf[ExecutionState])
+    registerClass(classOf[JobStatus])
+    registerClass(classOf[TaskExecutionState])
+    registerClass(classOf[InstanceConnectionInfo])
+    registerClass(classOf[HardwareDescription])
+    registerClass(classOf[Inet4Address])
+    registerClass(classOf[ChannelID])
+    registerClass(classOf[ConnectionInfoLookupResponse])
+    registerClass(classOf[RemoteReceiver])
+    registerClass(classOf[AccumulatorEvent], new JavaSerializer)
+    registerClass(classOf[Instance], new JavaSerializer())
+    registerClass(classOf[JobGraph], new JavaSerializer())
+    registerClass(classOf[TaskDeploymentDescriptor], new JavaSerializer())
+    registerClass(classOf[ExecutionGraph], new JavaSerializer())
+    registerClass(classOf[ProfilingDataContainer], new JavaSerializer)
+    registerClass(classOf[Task], new JavaSerializer)
+    registerClass(classOf[GenericInputSplit], new JavaSerializer)
+    registerClass(classOf[LocatableInputSplit], new JavaSerializer)
+    registerClass(classOf[FileInputSplit], new JavaSerializer)
+    registerClass(classOf[StackTraceElement])
+    registerClass(classOf[Array[StackTraceElement]])
+
+    //Archive messages
+    registerClass(classOf[ArchiveExecutionGraph])
+    registerClass(RequestArchivedJobs.getClass)
+    registerClass(classOf[ArchivedJobs])
+
+    //ExecutionGraph messages
+    registerClass(classOf[ExecutionStateChanged])
+    registerClass(classOf[JobStatusChanged])
+
+    //JobClient messages
+    registerClass(classOf[SubmitJobAndWait])
+    registerClass(classOf[SubmitJobDetached])
+
+    // JobManager messages
+    registerClass(classOf[SubmitJob])
+    registerClass(classOf[SubmissionSuccess])
+    registerClass(classOf[SubmissionFailure])
+    registerClass(classOf[CancelJob])
+    registerClass(classOf[UpdateTaskExecutionState])
+    registerClass(classOf[RequestNextInputSplit])
+    registerClass(classOf[LookupConnectionInformation])
+    registerClass(classOf[ConnectionInformation])
+    registerClass(classOf[ReportAccumulatorResult])
+    registerClass(classOf[RequestAccumulatorResults])
+    registerClass(classOf[AccumulatorResultsFound])
+    registerClass(classOf[AccumulatorResultsNotFound])
+    registerClass(classOf[RequestJobStatus])
+    registerClass(classOf[CurrentJobStatus])
+    registerClass(RequestNumberRegisteredTaskManager.getClass)
+    registerClass(RequestTotalNumberOfSlots.getClass)
+    registerClass(RequestBlobManagerPort.getClass)
+    registerClass(classOf[RequestFinalJobStatus])
+    registerClass(classOf[JobResultSuccess])
+    registerClass(classOf[JobResultCanceled])
+    registerClass(classOf[JobResultFailed])
+    registerClass(classOf[CancellationSuccess])
+    registerClass(classOf[CancellationFailure])
+    registerClass(RequestRunningJobs.getClass)
+    registerClass(classOf[RunningJobs])
+    registerClass(classOf[RequestJob])
+    registerClass(classOf[JobFound])
+    registerClass(classOf[JobNotFound])
+    registerClass(RequestRegisteredTaskManagers.getClass)
+    registerClass(classOf[RegisteredTaskManagers])
+
+    //JobManagerProfiler messages
+    registerClass(classOf[ReportProfilingData])
+
+    //Registration messages
+    registerClass(classOf[RegisterTaskManager])
+    registerClass(classOf[AcknowledgeRegistration])
+
+    //TaskManager messages
+    registerClass(classOf[CancelTask])
+    registerClass(classOf[SubmitTask])
+    registerClass(classOf[NextInputSplit])
+    registerClass(classOf[UnregisterTask])
+    registerClass(classOf[TaskOperationResult])
+    registerClass(NotifyWhenRegisteredAtJobManager.getClass)
+    registerClass(RegisterAtJobManager.getClass)
+    registerClass(RegisteredAtJobManager.getClass)
+    registerClass(SendHeartbeat.getClass)
+    registerClass(classOf[Heartbeat])
+    registerClass(LogMemoryUsage.getClass)
+
+    //TaskManagerProfiler messages
+    registerClass(classOf[MonitorTask])
+    registerClass(classOf[UnmonitorTask])
+    registerClass(RegisterProfilingListener.getClass)
+    registerClass(UnregisterProfilingListener.getClass)
+    registerClass(ProfileTasks.getClass)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f726e552/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index a18240e..eb0d79c 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -108,7 +108,7 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala {
         hardwareInformation, numberOfSlots)
 
       // to be notified when the taskManager is no longer reachable
-//      context.watch(taskManager);
+      context.watch(taskManager);
 
       taskManager ! AcknowledgeRegistration(instanceID, libraryCacheManager.getBlobServerPort)
     }
@@ -246,7 +246,8 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala {
       }else {
         currentJobs.get(taskExecutionState.getJobID) match {
           case Some((executionGraph, _)) =>
-            sender() ! executionGraph.updateState(taskExecutionState)
+            val originalSender = sender()
+            originalSender ! executionGraph.updateState(taskExecutionState)
           case None => log.error(s"Cannot find execution graph for ID ${taskExecutionState
             .getJobID} to change state to ${taskExecutionState.getExecutionState}.")
             sender() ! false
@@ -254,24 +255,41 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala {
       }
     }
 
-    case RequestNextInputSplit(jobID, vertexID) => {
+    case RequestNextInputSplit(jobID, vertexID, executionAttempt) => {
       val nextInputSplit = currentJobs.get(jobID) match {
-        case Some((executionGraph,_)) => executionGraph.getJobVertex(vertexID) match {
-          case vertex: ExecutionJobVertex => vertex.getSplitAssigner match {
-            case splitAssigner: InputSplitAssigner => splitAssigner.getNextInputSplit(null)
-            case _ =>
-              log.error(s"No InputSplitAssigner for vertex ID ${vertexID}.")
+        case Some((executionGraph,_)) =>
+          val execution = executionGraph.getRegisteredExecutions().get(executionAttempt)
+
+          if(execution == null){
+            log.error("Can not find Execution for attempt " + executionAttempt)
+            null
+          }else{
+            val slot = execution.getAssignedResource
+
+            val host = if(slot != null){
+              slot.getInstance().getInstanceConnectionInfo.getHostname
+            }else{
               null
+            }
+
+            executionGraph.getJobVertex(vertexID) match {
+              case vertex: ExecutionJobVertex => vertex.getSplitAssigner match {
+                case splitAssigner: InputSplitAssigner => splitAssigner.getNextInputSplit(host)
+                case _ =>
+                  log.error(s"No InputSplitAssigner for vertex ID ${vertexID}.")
+                  null
+              }
+              case _ =>
+                log.error(s"Cannot find execution vertex for vertex ID ${vertexID}.")
+                null
           }
-          case _ =>
-            log.error(s"Cannot find execution vertex for vertex ID ${vertexID}.")
-            null
         }
         case None =>
           log.error(s"Cannot find execution graph for job ID ${jobID}.")
           null
       }
 
+      log.debug("Send next input split {}.", nextInputSplit)
       sender() ! NextInputSplit(nextInputSplit)
     }
 
@@ -323,8 +341,10 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala {
     case LookupConnectionInformation(connectionInformation, jobID, sourceChannelID) => {
       currentJobs.get(jobID) match {
         case Some((executionGraph, _)) =>
-          sender() ! ConnectionInformation(executionGraph.lookupConnectionInfoAndDeployReceivers
-            (connectionInformation, sourceChannelID))
+          val originalSender = sender()
+          originalSender ! ConnectionInformation(
+            executionGraph.lookupConnectionInfoAndDeployReceivers
+              (connectionInformation, sourceChannelID))
         case None =>
           log.error(s"Cannot find execution graph for job ID ${jobID}.")
           sender() ! ConnectionInformation(ConnectionInfoLookupResponse.createReceiverNotFound())
@@ -381,7 +401,7 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala {
     case Terminated(taskManager) => {
       log.info(s"Task manager ${taskManager.path} terminated.")
       instanceManager.unregisterTaskManager(taskManager)
-//      context.unwatch(taskManager)
+      context.unwatch(taskManager)
     }
   }
 
@@ -442,6 +462,7 @@ object JobManager {
     parser.parse(args, JobManagerCLIConfiguration()) map {
       config =>
         GlobalConfiguration.loadConfiguration(config.configDir)
+
         val configuration = GlobalConfiguration.getConfiguration()
         if (config.configDir != null && new File(config.configDir).isDirectory) {
           configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, config.configDir + "/..")

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f726e552/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala
index b1c5f1f..de3abd7 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.messages
 
 import org.apache.flink.runtime.accumulators.AccumulatorEvent
-import org.apache.flink.runtime.executiongraph.ExecutionGraph
+import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph}
 import org.apache.flink.runtime.instance.{Instance, InstanceConnectionInfo}
 import org.apache.flink.runtime.io.network.ConnectionInfoLookupResponse
 import org.apache.flink.runtime.io.network.channels.ChannelID
@@ -71,7 +71,8 @@ object JobManagerMessages {
    * @param jobID
    * @param vertexID
    */
-  case class RequestNextInputSplit(jobID: JobID, vertexID: JobVertexID)
+  case class RequestNextInputSplit(jobID: JobID, vertexID: JobVertexID, executionAttempt:
+  ExecutionAttemptID)
 
   /**
    * Looks up the connection information of a task being the source of a channel specified by

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f726e552/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 1c5a9df..bfb9ae2 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit
 
 import akka.pattern.ask
 import akka.actor.{ActorRef, ActorSystem}
+import com.typesafe.config.{ConfigFactory, Config}
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
@@ -61,21 +62,35 @@ abstract class FlinkMiniCluster(userConfiguration: Configuration) {
   def startTaskManager(index: Int)(implicit system: ActorSystem):
   ActorRef
 
+  def getJobManagerAkkaConfigString(): String = {
+    val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants
+      .DEFAULT_JOB_MANAGER_IPC_PORT)
+
+    AkkaUtils.getConfigString(HOSTNAME, port, configuration)
+  }
+
   def startJobManagerActorSystem(): ActorSystem = {
-    val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
-      ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
+    val configString = getJobManagerAkkaConfigString()
 
-    AkkaUtils.createActorSystem(HOSTNAME, port, configuration)
+    val config = ConfigFactory.parseString(getJobManagerAkkaConfigString())
+
+    AkkaUtils.createActorSystem(config)
   }
 
-  def startTaskManagerActorSystem(index: Int): ActorSystem = {
+  def getTaskManagerAkkaConfigString(index: Int): String = {
     val port = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY,
       ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT)
 
-    AkkaUtils.createActorSystem(HOSTNAME, if(port != 0) port + index else port,
+    AkkaUtils.getConfigString(HOSTNAME, if(port != 0) port + index else port,
       configuration)
   }
 
+  def startTaskManagerActorSystem(index: Int): ActorSystem = {
+    val config = ConfigFactory.parseString(getTaskManagerAkkaConfigString(index))
+
+    AkkaUtils.createActorSystem(config)
+  }
+
   def getJobManager: ActorRef = {
     jobManagerActor
   }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f726e552/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 261d50a..b1cfda7 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -185,7 +185,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
         currentJobManager = sender()
         instanceID = id
 
-//        context.watch(currentJobManager)
+        context.watch(currentJobManager)
 
         log.info(s"TaskManager successfully registered at JobManager ${
           currentJobManager.path
@@ -261,7 +261,8 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
           case None =>
         }
 
-        val splitProvider = new TaskInputSplitProvider(currentJobManager, jobID, vertexID, timeout)
+        val splitProvider = new TaskInputSplitProvider(currentJobManager, jobID, vertexID,
+          executionID, timeout)
         val env = new RuntimeEnvironment(task, tdd, userCodeClassLoader, memoryManager,
           ioManager, splitProvider, currentJobManager, bcVarManager)
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f726e552/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/KryoTestingInitializer.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/KryoTestingInitializer.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/KryoTestingInitializer.scala
new file mode 100644
index 0000000..fc2ec8c
--- /dev/null
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/KryoTestingInitializer.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testingUtils
+
+import com.esotericsoftware.kryo.Kryo
+import org.apache.flink.runtime.akka.KryoInitializer
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages._
+
+class KryoTestingInitializer {
+  def customize(kryo: Kryo): Unit = {
+    val initializer = new KryoInitializer
+
+    initializer.customize(kryo)
+
+    kryo.register(classOf[RequestExecutionGraph])
+    kryo.register(classOf[ExecutionGraphFound])
+    kryo.register(classOf[ExecutionGraphNotFound])
+    kryo.register(classOf[WaitForAllVerticesToBeRunning])
+    kryo.register(classOf[AllVerticesRunning])
+    kryo.register(classOf[NotifyWhenJobRemoved])
+
+    kryo.register(classOf[NotifyWhenTaskRemoved])
+    kryo.register(RequestRunningTasks.getClass)
+    kryo.register(classOf[ResponseRunningTasks])
+    kryo.register(RequestBroadcastVariablesWithReferences.getClass)
+    kryo.register(classOf[ResponseBroadcastVariablesWithReferences])
+    kryo.register(classOf[CheckIfJobRemoved])
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f726e552/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index 5a51265..3c11e1f 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -37,6 +37,14 @@ class TestingCluster(userConfiguration: Configuration) extends FlinkMiniCluster(
     cfg
   }
 
+  override def getJobManagerAkkaConfigString(): String = {
+    super.getJobManagerAkkaConfigString() + TestingUtils.getTestingSerializationBindings
+  }
+
+  override def getTaskManagerAkkaConfigString(index: Int): String = {
+    super.getTaskManagerAkkaConfigString(index) + TestingUtils.getTestingSerializationBindings
+  }
+
   override def startJobManager(implicit system: ActorSystem) = {
     system.actorOf(Props(new JobManager(configuration) with TestingJobManager),
       JobManager.JOB_MANAGER_NAME)

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f726e552/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
index 67a8934..bfb551f 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
@@ -84,12 +84,12 @@ trait TestingJobManager extends ActorLogMessages with WrapAsScala {
       }
 
       import context.dispatcher
-      val f = Future.sequence(responses)
-
-      val t = Await.result(f, timeout)
-
-      sender() ! true
-//      Future.fold(responses)(true)(_ & _) pipeTo sender()
+//      val f = Future.sequence(responses)
+//
+//      val t = Await.result(f, timeout)
+//
+//      sender() ! true
+      Future.fold(responses)(true)(_ & _) pipeTo sender()
     }
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f726e552/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index c1565c7..fce49a0 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -58,6 +58,37 @@ object TestingUtils {
     """.stripMargin
   }
 
+
+  // scalastyle:off line.size.limit
+  val getTestingSerializationBindings =
+  """
+    |akka {
+    |  actor {
+    |    kryo{
+    |      kryo-custom-serializer-init = "org.apache.flink.runtime.testingUtils.KryoTestingInitializer"
+    |    }
+    |
+    |    serialization-bindings {
+    |      "org.apache.flink.runtime.testingUtils.TestingJobManagerMessages$RequestExecutionGraph" = kryo
+    |      "org.apache.flink.runtime.testingUtils.TestingJobManagerMessages$ExecutionGraphFound" = kryo
+    |      "org.apache.flink.runtime.testingUtils.TestingJobManagerMessages$ExecutionGraphNotFound" = kryo
+    |      "org.apache.flink.runtime.testingUtils.TestingJobManagerMessages$WaitForAllVerticesToBeRunning" = kryo
+    |      "org.apache.flink.runtime.testingUtils.TestingJobManagerMessages$AllVerticesRunning" = kryo
+    |      "org.apache.flink.runtime.testingUtils.TestingJobManagerMessages$NotifyWhenJobRemoved" = kryo
+    |
+    |      "org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages$NotifyWhenTaskRemoved" = kryo
+    |      "org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages$RequestRunningTasks$" = kryo
+    |      "org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages$ResponseRunningTasks" = kryo
+    |      "org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages$RequestBroadcastVariablesWithReferences$" = kryo
+    |      "org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages$ResponseBroadcastVariablesWithReferences" = kryo
+    |      "org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages$CheckIfJobRemoved" = kryo
+    |    }
+    |  }
+    |}
+  """.stripMargin
+  // scalastyle:on line.size.limit
+
+
   def startTestingTaskManagerWithConfiguration(hostname: String, config: Configuration)
                                               (implicit system: ActorSystem) = {
     val (connectionInfo, jobManagerURL, taskManagerConfig, networkConnectionConfig) =

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f726e552/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
index f82a4a6..cf85533 100644
--- a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
+++ b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
@@ -22,7 +22,7 @@ import akka.actor.{Props, ActorSystem, ActorRef}
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
 import org.apache.flink.runtime.taskmanager.TaskManager
-import org.apache.flink.runtime.testingUtils.TestingTaskManager
+import org.apache.flink.runtime.testingUtils.{TestingUtils, TestingTaskManager}
 
 class ForkableFlinkMiniCluster(userConfiguration: Configuration) extends
 LocalFlinkMiniCluster(userConfiguration) {
@@ -52,6 +52,10 @@ LocalFlinkMiniCluster(userConfiguration) {
     super.generateConfiguration(config)
   }
 
+  override def getTaskManagerAkkaConfigString(index: Int): String = {
+    super.getTaskManagerAkkaConfigString(index) + TestingUtils.getTestingSerializationBindings
+  }
+
   override def startTaskManager(index: Int)(implicit system: ActorSystem): ActorRef = {
     val config = configuration.clone()
 


Mime
View raw message