flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m..@apache.org
Subject [2/3] flink git commit: [FLINK-3701] enable reuse of ExecutionConfig
Date Fri, 13 May 2016 16:06:47 GMT
[FLINK-3701] enable reuse of ExecutionConfig

Depending on the context, the ExecutionConfig's type fields may either
be deserialized using a custom class loader or the default class
loader. It may be explicitly serialized for the Task or shipped inside
the PojoSerializer where it is serialized or directly passed in local
mode. An ExecutionConfig may be reused and thus its fields can't be set
to null after it has been shipped once.

The entire ExecutionConfig is now serialized upon setting it on the
JobGraph. It is not passed through the JobGraph's constructor but set
explicitly on the JobGraph. If no ExecutionConfig has been set, the
default is used. Unlike before, no code may modify the ExecutionConfig
after it has been set on the JobGraph.

This closes #1913


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/48b469ad
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/48b469ad
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/48b469ad

Branch: refs/heads/master
Commit: 48b469ad4f0da466b347071cea82913965645de3
Parents: 099fdfa
Author: Maximilian Michels <mxm@apache.org>
Authored: Mon May 2 11:41:05 2016 +0200
Committer: Maximilian Michels <mxm@apache.org>
Committed: Fri May 13 18:01:38 2016 +0200

----------------------------------------------------------------------
 .../flink/api/common/ExecutionConfig.java       |  98 +----------
 .../flink/api/common/ExecutionConfigTest.java   |  27 +++
 .../plantranslate/JobGraphGenerator.java        |  14 +-
 .../webmonitor/handlers/JobConfigHandler.java   |   8 +-
 .../BackPressureStatsTrackerITCase.java         |   2 +-
 .../StackTraceSampleCoordinatorITCase.java      |   2 +-
 .../deployment/TaskDeploymentDescriptor.java    |  14 +-
 .../runtime/executiongraph/ExecutionGraph.java  |  16 +-
 .../runtime/executiongraph/ExecutionVertex.java |   4 +-
 .../apache/flink/runtime/jobgraph/JobGraph.java |  65 +++----
 .../apache/flink/runtime/taskmanager/Task.java  |  30 ++--
 .../flink/runtime/jobmanager/JobManager.scala   |  23 +--
 .../checkpoint/CoordinatorShutdownTest.java     |   4 +-
 ...ExecutionGraphCheckpointCoordinatorTest.java |   4 +-
 .../client/JobClientActorRecoveryITCase.java    |   2 +-
 .../runtime/client/JobClientActorTest.java      |   2 +-
 .../TaskDeploymentDescriptorTest.java           |   8 +-
 .../ExecutionGraphConstructionTest.java         |  20 +--
 .../ExecutionGraphDeploymentTest.java           |  10 +-
 .../ExecutionGraphRestartTest.java              |  49 +++---
 .../ExecutionGraphSignalsTest.java              |   4 +-
 .../executiongraph/ExecutionGraphTestUtils.java |   7 +-
 .../ExecutionStateProgressTest.java             | 176 +++++++++----------
 .../executiongraph/LocalInputSplitsTest.java    |  10 +-
 .../executiongraph/PointwisePatternTest.java    |  28 +--
 .../TerminalStateDeadlockTest.java              |   4 +-
 .../VertexLocationConstraintTest.java           |  26 +--
 .../executiongraph/VertexSlotSharingTest.java   |   4 +-
 .../PartialConsumePipelinedResultTest.java      |   3 +-
 .../flink/runtime/jobgraph/JobGraphTest.java    |  13 +-
 .../jobgraph/jsonplan/JsonGeneratorTest.java    |   2 +-
 .../runtime/jobmanager/JobManagerTest.java      |   6 +-
 .../flink/runtime/jobmanager/JobSubmitTest.java |   4 +-
 .../SlotCountExceedingParallelismTest.java      |   2 +-
 .../StandaloneSubmittedJobGraphStoreTest.java   |   2 +-
 .../ZooKeeperSubmittedJobGraphsStoreITCase.java |   2 +-
 .../ScheduleOrUpdateConsumersTest.java          |   1 -
 .../LeaderChangeJobRecoveryTest.java            |   5 +-
 .../LeaderChangeStateCleanupTest.java           |   2 +-
 .../runtime/taskmanager/TaskAsyncCallTest.java  |   5 +-
 .../TaskCancelAsyncProducerConsumerITCase.java  |   2 +-
 .../runtime/taskmanager/TaskCancelTest.java     |   2 +-
 .../runtime/taskmanager/TaskManagerTest.java    |  32 ++--
 .../flink/runtime/taskmanager/TaskStopTest.java |   3 +-
 .../flink/runtime/taskmanager/TaskTest.java     |   5 +-
 .../TaskManagerLossFailsTasksTest.scala         |   8 +-
 .../jobmanager/CoLocationConstraintITCase.scala |   2 +-
 .../runtime/jobmanager/JobManagerITCase.scala   |  46 +++--
 .../runtime/jobmanager/RecoveryITCase.scala     |   9 +-
 .../runtime/jobmanager/SlotSharingITCase.scala  |   5 +-
 .../TaskManagerFailsWithSlotSharingITCase.scala |   4 +-
 .../api/graph/StreamingJobGraphGenerator.java   |  14 +-
 .../streaming/api/RestartStrategyTest.java      |  10 +-
 .../graph/StreamingJobGraphGeneratorTest.java   |  14 +-
 .../partitioner/RescalePartitionerTest.java     |   4 +-
 .../streaming/runtime/tasks/StreamTaskTest.java |   5 +-
 .../JobSubmissionFailsITCase.java               |   7 +-
 .../JobManagerHACheckpointRecoveryITCase.java   |   3 +-
 .../JobManagerHAJobGraphRecoveryITCase.java     |   2 +-
 .../runtime/NetworkStackThroughputITCase.java   |   2 +-
 .../ZooKeeperLeaderElectionITCase.java          |   3 +-
 .../flink/test/web/WebFrontendITCase.java       |   5 +-
 .../jobmanager/JobManagerFailsITCase.scala      |   6 +-
 .../JobManagerLeaderSessionIDITSuite.scala      |   2 +-
 .../taskmanager/TaskManagerFailsITCase.scala    |  20 +--
 65 files changed, 437 insertions(+), 486 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index c27ee74..d27760f 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -22,11 +22,10 @@ import com.esotericsoftware.kryo.Serializer;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.util.SerializedValue;
 
 
-import java.io.IOException;
 import java.io.Serializable;
+import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.Map;
@@ -128,7 +127,7 @@ public class ExecutionConfig implements Serializable {
 
 	// ------------------------------- User code values --------------------------------------------
 
-	private transient GlobalJobParameters globalJobParameters;
+	private GlobalJobParameters globalJobParameters;
 
 	// Serializers and types registered with Kryo and the PojoSerializer
 	// we store them in linked maps/sets to ensure they are registered in order in all kryo instances.
@@ -145,22 +144,6 @@ public class ExecutionConfig implements Serializable {
 
 	private LinkedHashSet<Class<?>> registeredPojoTypes = new LinkedHashSet<>();
 
-	// ----------------------- Helper values for serialized user objects ---------------------------
-
-	private SerializedValue<GlobalJobParameters> serializedGlobalJobParameters;
-
-	private SerializedValue<LinkedHashMap<Class<?>, SerializableSerializer<?>>> serializedRegisteredTypesWithKryoSerializers;
-
-	private SerializedValue<LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>> serializedRegisteredTypesWithKryoSerializerClasses;
-
-	private SerializedValue<LinkedHashMap<Class<?>, SerializableSerializer<?>>> serializedDefaultKryoSerializers;
-
-	private SerializedValue<LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>> serializedDefaultKryoSerializerClasses;
-
-	private SerializedValue<LinkedHashSet<Class<?>>> serializedRegisteredKryoTypes;
-
-	private SerializedValue<LinkedHashSet<Class<?>>> serializedRegisteredPojoTypes;
-
 	// --------------------------------------------------------------------------------------------
 
 	/**
@@ -695,79 +678,6 @@ public class ExecutionConfig implements Serializable {
 		this.autoTypeRegistrationEnabled = false;
 	}
 
-	/**
-	 * Deserializes user code objects given a user code class loader
-	 *
-	 * @param userCodeClassLoader User code class loader
-	 * @throws IOException Thrown if an IOException occurs while loading the classes
-	 * @throws ClassNotFoundException Thrown if the given class cannot be loaded
-	 */
-	public void deserializeUserCode(ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException {
-		if (serializedRegisteredKryoTypes != null) {
-			registeredKryoTypes = serializedRegisteredKryoTypes.deserializeValue(userCodeClassLoader);
-		} else {
-			registeredKryoTypes = new LinkedHashSet<>();
-		}
-
-		if (serializedRegisteredPojoTypes != null) {
-			registeredPojoTypes = serializedRegisteredPojoTypes.deserializeValue(userCodeClassLoader);
-		} else {
-			registeredPojoTypes = new LinkedHashSet<>();
-		}
-
-		if (serializedRegisteredTypesWithKryoSerializerClasses != null) {
-			registeredTypesWithKryoSerializerClasses = serializedRegisteredTypesWithKryoSerializerClasses.deserializeValue(userCodeClassLoader);
-		} else {
-			registeredTypesWithKryoSerializerClasses = new LinkedHashMap<>();
-		}
-
-		if (serializedRegisteredTypesWithKryoSerializers != null) {
-			registeredTypesWithKryoSerializers = serializedRegisteredTypesWithKryoSerializers.deserializeValue(userCodeClassLoader);
-		} else {
-			registeredTypesWithKryoSerializerClasses = new LinkedHashMap<>();
-		}
-
-		if (serializedDefaultKryoSerializers != null) {
-			defaultKryoSerializers = serializedDefaultKryoSerializers.deserializeValue(userCodeClassLoader);
-		} else {
-			defaultKryoSerializers = new LinkedHashMap<>();
-
-		}
-
-		if (serializedDefaultKryoSerializerClasses != null) {
-			defaultKryoSerializerClasses = serializedDefaultKryoSerializerClasses.deserializeValue(userCodeClassLoader);
-		} else {
-			defaultKryoSerializerClasses = new LinkedHashMap<>();
-		}
-
-		if (serializedGlobalJobParameters != null) {
-			globalJobParameters = serializedGlobalJobParameters.deserializeValue(userCodeClassLoader);
-		}
-	}
-
-	public void serializeUserCode() throws IOException {
-		serializedRegisteredKryoTypes = new SerializedValue<>(registeredKryoTypes);
-		registeredKryoTypes = null;
-
-		serializedRegisteredPojoTypes = new SerializedValue<>(registeredPojoTypes);
-		registeredPojoTypes = null;
-
-		serializedRegisteredTypesWithKryoSerializerClasses = new SerializedValue<>(registeredTypesWithKryoSerializerClasses);
-		registeredTypesWithKryoSerializerClasses = null;
-
-		serializedRegisteredTypesWithKryoSerializers = new SerializedValue<>(registeredTypesWithKryoSerializers);
-		registeredTypesWithKryoSerializers = null;
-
-		serializedDefaultKryoSerializers = new SerializedValue<>(defaultKryoSerializers);
-		defaultKryoSerializers = null;
-
-		serializedDefaultKryoSerializerClasses = new SerializedValue<>(defaultKryoSerializerClasses);
-		defaultKryoSerializerClasses = null;
-
-		serializedGlobalJobParameters = new SerializedValue<>(globalJobParameters);
-		globalJobParameters = null;
-	}
-
 	@Override
 	public boolean equals(Object obj) {
 		if (obj instanceof ExecutionConfig) {
@@ -854,10 +764,10 @@ public class ExecutionConfig implements Serializable {
 		 * Convert UserConfig into a {@code Map<String, String>} representation.
 		 * This can be used by the runtime, for example for presenting the user config in the web frontend.
 		 *
-		 * @return Key/Value representation of the UserConfig, or null.
+		 * @return Key/Value representation of the UserConfig
 		 */
 		public Map<String, String> toMap() {
-			return null;
+			return Collections.emptyMap();
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
index 158d971..103e06f 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
@@ -18,8 +18,10 @@
 
 package org.apache.flink.api.common;
 
+import org.apache.flink.util.SerializedValue;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 
@@ -74,4 +76,29 @@ public class ExecutionConfigTest {
 
 		assertEquals(parallelism, config.getParallelism());
 	}
+
+	/**
+	 * Helper function to create a new ExecutionConfig for tests.
+	 * @return A serialized ExecutionConfig
+	 */
+	public static SerializedValue<ExecutionConfig> getSerializedConfig() {
+		try {
+			return new SerializedValue<>(new ExecutionConfig());
+		} catch (IOException e) {
+			throw new RuntimeException("Couldn't create new ExecutionConfig for test.", e);
+		}
+	}
+
+	/**
+	 * Deserializes the given ExecutionConfig with the System class loader.
+	 * @param serializedConfig The serialized ExecutionConfig
+	 * @return ExecutionConfig
+	 */
+	public static ExecutionConfig deserializeConfig(SerializedValue<ExecutionConfig> serializedConfig) {
+		try {
+			return serializedConfig.deserializeValue(ExecutionConfigTest.class.getClassLoader());
+		} catch (Exception e) {
+			throw new RuntimeException("Could not deserialize ExecutionConfig for test.", e);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index 696a05d..a5ae00c 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -83,7 +83,6 @@ import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.util.StringUtils;
 import org.apache.flink.util.Visitor;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -215,7 +214,8 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 		// ----------- finalize the job graph -----------
 
 		// create the job graph object
-		JobGraph graph = new JobGraph(jobId, program.getJobName(), program.getOriginalPlan().getExecutionConfig());
+		JobGraph graph = new JobGraph(jobId, program.getJobName());
+		graph.setExecutionConfig(program.getOriginalPlan().getExecutionConfig());
 
 		graph.setAllowQueuedScheduling(false);
 		graph.setSessionTimeout(program.getOriginalPlan().getSessionTimeout());
@@ -243,18 +243,10 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 		this.iterations = null;
 		this.iterationStack = null;
 
-		try {
-			// make sure that we can send the ExecutionConfig using the system class loader
-			graph.getExecutionConfig().serializeUserCode();
-		} catch (IOException e) {
-			throw new CompilerException("Could not serialize the user code object in the " +
-				"ExecutionConfig.", e);
-		}
-		
 		// return job graph
 		return graph;
 	}
-	
+
 	/**
 	 * This methods implements the pre-visiting during a depth-first traversal. It create the job vertex and
 	 * sets local strategy.

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
index 0f2f514..cd63630 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
@@ -45,7 +45,13 @@ public class JobConfigHandler extends AbstractExecutionGraphRequestHandler {
 		gen.writeStringField("jid", graph.getJobID().toString());
 		gen.writeStringField("name", graph.getJobName());
 
-		ExecutionConfig ec = graph.getExecutionConfig();
+		ExecutionConfig ec;
+		try {
+			ec = graph.getSerializedExecutionConfig().deserializeValue(graph.getUserClassLoader());
+		} catch (Exception e) {
+			throw new RuntimeException("Couldn't deserialize ExecutionConfig.", e);
+		}
+
 		if (ec != null) {
 			gen.writeObjectFieldStart("execution-config");
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
index 1f0b2ef..25dc189 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
@@ -93,7 +93,7 @@ public class BackPressureStatsTrackerITCase extends TestLogger {
 			final FiniteDuration deadline = new FiniteDuration(60, TimeUnit.SECONDS);
 
 			// The JobGraph
-			final JobGraph jobGraph = new JobGraph(new ExecutionConfig());
+			final JobGraph jobGraph = new JobGraph();
 			final int parallelism = 4;
 
 			final JobVertex task = new JobVertex("Task");

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
index c6ce315..9b1f608 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
@@ -77,7 +77,7 @@ public class StackTraceSampleCoordinatorITCase extends TestLogger {
 			final FiniteDuration deadline = new FiniteDuration(60, TimeUnit.SECONDS);
 
 			// The JobGraph
-			final JobGraph jobGraph = new JobGraph(new ExecutionConfig());
+			final JobGraph jobGraph = new JobGraph();
 			final int parallelism = 1;
 
 			final JobVertex task = new JobVertex("Task");

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
index 948f6af..2b1c224 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
@@ -90,7 +90,7 @@ public final class TaskDeploymentDescriptor implements Serializable {
 	private final SerializedValue<StateHandle<?>> operatorState;
 
 	/** The execution configuration (see {@link ExecutionConfig}) related to the specific job. */
-	private final ExecutionConfig executionConfig;
+	private final SerializedValue<ExecutionConfig> serializedExecutionConfig;
 
 	private long recoveryTimestamp;
 		
@@ -101,7 +101,7 @@ public final class TaskDeploymentDescriptor implements Serializable {
 			JobID jobID,
 			JobVertexID vertexID,
 			ExecutionAttemptID executionId,
-			ExecutionConfig executionConfig,
+			SerializedValue<ExecutionConfig> serializedExecutionConfig,
 			String taskName,
 			int indexInSubtaskGroup,
 			int numberOfSubtasks,
@@ -125,7 +125,7 @@ public final class TaskDeploymentDescriptor implements Serializable {
 		this.jobID = checkNotNull(jobID);
 		this.vertexID = checkNotNull(vertexID);
 		this.executionId = checkNotNull(executionId);
-		this.executionConfig = checkNotNull(executionConfig);
+		this.serializedExecutionConfig = checkNotNull(serializedExecutionConfig);
 		this.taskName = checkNotNull(taskName);
 		this.indexInSubtaskGroup = indexInSubtaskGroup;
 		this.numberOfSubtasks = numberOfSubtasks;
@@ -146,7 +146,7 @@ public final class TaskDeploymentDescriptor implements Serializable {
 		JobID jobID,
 		JobVertexID vertexID,
 		ExecutionAttemptID executionId,
-		ExecutionConfig executionConfig,
+		SerializedValue<ExecutionConfig> serializedExecutionConfig,
 		String taskName,
 		int indexInSubtaskGroup,
 		int numberOfSubtasks,
@@ -164,7 +164,7 @@ public final class TaskDeploymentDescriptor implements Serializable {
 			jobID,
 			vertexID,
 			executionId,
-			executionConfig,
+			serializedExecutionConfig,
 			taskName,
 			indexInSubtaskGroup,
 			numberOfSubtasks,
@@ -185,8 +185,8 @@ public final class TaskDeploymentDescriptor implements Serializable {
 	 * Returns the execution configuration (see {@link ExecutionConfig}) related to the
 	 * specific job.
 	 */
-	public ExecutionConfig getExecutionConfig() {
-		return executionConfig;
+	public SerializedValue<ExecutionConfig> getSerializedExecutionConfig() {
+		return serializedExecutionConfig;
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 3796402..5dae785 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -175,7 +175,7 @@ public class ExecutionGraph implements Serializable {
 	// ------ Configuration of the Execution -------
 
 	/** The execution configuration (see {@link ExecutionConfig}) related to this specific job. */
-	private ExecutionConfig executionConfig;
+	private SerializedValue<ExecutionConfig> serializedExecutionConfig;
 
 	/** Flag to indicate whether the scheduler may queue tasks for execution, or needs to be able
 	 * to deploy them immediately. */
@@ -245,7 +245,7 @@ public class ExecutionGraph implements Serializable {
 			JobID jobId,
 			String jobName,
 			Configuration jobConfig,
-			ExecutionConfig config,
+			SerializedValue<ExecutionConfig> serializedConfig,
 			FiniteDuration timeout,
 			RestartStrategy restartStrategy) {
 		this(
@@ -253,7 +253,7 @@ public class ExecutionGraph implements Serializable {
 			jobId,
 			jobName,
 			jobConfig,
-			config,
+			serializedConfig,
 			timeout,
 			restartStrategy,
 			new ArrayList<BlobKey>(),
@@ -267,7 +267,7 @@ public class ExecutionGraph implements Serializable {
 			JobID jobId,
 			String jobName,
 			Configuration jobConfig,
-			ExecutionConfig config,
+			SerializedValue<ExecutionConfig> serializedConfig,
 			FiniteDuration timeout,
 			RestartStrategy restartStrategy,
 			List<BlobKey> requiredJarFiles,
@@ -301,7 +301,7 @@ public class ExecutionGraph implements Serializable {
 		this.requiredJarFiles = requiredJarFiles;
 		this.requiredClasspaths = requiredClasspaths;
 
-		this.executionConfig = checkNotNull(config);
+		this.serializedExecutionConfig = checkNotNull(serializedConfig);
 
 		this.timeout = timeout;
 
@@ -962,12 +962,12 @@ public class ExecutionGraph implements Serializable {
 	}
 
 	/**
-	 * Returns the {@link ExecutionConfig}.
+	 * Returns the serialized {@link ExecutionConfig}.
 	 *
 	 * @return ExecutionConfig
 	 */
-	public ExecutionConfig getExecutionConfig() {
-		return executionConfig;
+	public SerializedValue<ExecutionConfig> getSerializedExecutionConfig() {
+		return serializedExecutionConfig;
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 4d27423..cbc47a4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -667,7 +667,7 @@ public class ExecutionVertex implements Serializable {
 			consumedPartitions.add(new InputGateDeploymentDescriptor(resultId, queueToRequest, partitions));
 		}
 
-		ExecutionConfig config = getExecutionGraph().getExecutionConfig();
+		SerializedValue<ExecutionConfig> serializedConfig = getExecutionGraph().getSerializedExecutionConfig();
 		List<BlobKey> jarFiles = getExecutionGraph().getRequiredJarFiles();
 		List<URL> classpaths = getExecutionGraph().getRequiredClasspaths();
 
@@ -675,7 +675,7 @@ public class ExecutionVertex implements Serializable {
 			getJobId(),
 			getJobvertexId(),
 			executionId,
-			config,
+			serializedConfig,
 			getTaskName(),
 			subTaskIndex,
 			getTotalNumberOfParallelSubtasks(),

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index b7c6551..b3e3739 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -28,6 +28,8 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -96,79 +98,66 @@ public class JobGraph implements Serializable {
 	private List<URL> classpaths = Collections.emptyList();
 
 	/** Job specific execution config */
-	private ExecutionConfig executionConfig;
+	private SerializedValue<ExecutionConfig> serializedExecutionConfig;
 
 	// --------------------------------------------------------------------------------------------
 
 	/**
-	 * Constructs a new job graph with no name, a random job ID, and the given
-	 * {@link ExecutionConfig}.
-	 *
-	 * @param config The {@link ExecutionConfig} for the job.
-	 */
-	public JobGraph(ExecutionConfig config) {
-		this(null, config);
-	}
-
-	/**
 	 * Constructs a new job graph with the given name, the given {@link ExecutionConfig},
-	 * and a random job ID.
+	 * and a random job ID. The ExecutionConfig will be serialized and can't be modified afterwards.
 	 *
 	 * @param jobName The name of the job.
-	 * @param config The execution configuration of the job.
 	 */
-	public JobGraph(String jobName, ExecutionConfig config) {
-		this(null, jobName, config);
+	public JobGraph(String jobName) {
+		this(null, jobName);
 	}
 
 	/**
 	 * Constructs a new job graph with the given job ID (or a random ID, if {@code null} is passed),
 	 * the given name and the given execution configuration (see {@link ExecutionConfig}).
+	 * The ExecutionConfig will be serialized and can't be modified afterwards.
 	 *
 	 * @param jobId The id of the job. A random ID is generated, if {@code null} is passed.
 	 * @param jobName The name of the job.
-	 * @param config The execution configuration of the job.
 	 */
-	public JobGraph(JobID jobId, String jobName, ExecutionConfig config) {
+	public JobGraph(JobID jobId, String jobName) {
 		this.jobID = jobId == null ? new JobID() : jobId;
 		this.jobName = jobName == null ? "(unnamed job)" : jobName;
-		this.executionConfig = config == null ? new ExecutionConfig() : config;
+		setExecutionConfig(new ExecutionConfig());
 	}
 
 	/**
 	 * Constructs a new job graph with no name, a random job ID, the given {@link ExecutionConfig}, and
-	 * the given job vertices.
+	 * the given job vertices. The ExecutionConfig will be serialized and can't be modified afterwards.
 	 *
-	 * @param config The execution configuration of the job.
 	 * @param vertices The vertices to add to the graph.
 	 */
-	public JobGraph(ExecutionConfig config, JobVertex... vertices) {
-		this(null, config, vertices);
+	public JobGraph(JobVertex... vertices) {
+		this(null, vertices);
 	}
 
 	/**
 	 * Constructs a new job graph with the given name, the given {@link ExecutionConfig}, a random job ID,
-	 * and the given job vertices.
+	 * and the given job vertices. The ExecutionConfig will be serialized and can't be modified afterwards.
 	 *
 	 * @param jobName The name of the job.
-	 * @param config The execution configuration of the job.
 	 * @param vertices The vertices to add to the graph.
 	 */
-	public JobGraph(String jobName, ExecutionConfig config, JobVertex... vertices) {
-		this(null, jobName, config, vertices);
+	public JobGraph(String jobName, JobVertex... vertices) {
+		this(null, jobName, vertices);
 	}
 
 	/**
 	 * Constructs a new job graph with the given name, the given {@link ExecutionConfig},
 	 * the given jobId or a random one if null supplied, and the given job vertices.
+	 * The ExecutionConfig will be serialized and can't be modified afterwards.
 	 *
 	 * @param jobId The id of the job. A random ID is generated, if {@code null} is passed.
 	 * @param jobName The name of the job.
-	 * @param config The execution configuration of the job.
 	 * @param vertices The vertices to add to the graph.
 	 */
-	public JobGraph(JobID jobId, String jobName, ExecutionConfig config, JobVertex... vertices) {
-		this(jobId, jobName, config);
+	public JobGraph(JobID jobId, String jobName, JobVertex... vertices) {
+		this(jobId, jobName);
 
 		for (JobVertex vertex : vertices) {
 			addVertex(vertex);
@@ -210,8 +199,8 @@ public class JobGraph implements Serializable {
 	 *
 	 * @return ExecutionConfig
 	 */
-	public ExecutionConfig getExecutionConfig() {
-		return executionConfig;
+	public SerializedValue<ExecutionConfig> getSerializedExecutionConfig() {
+		return serializedExecutionConfig;
 	}
 
 	/**
@@ -249,6 +238,20 @@ public class JobGraph implements Serializable {
 	}
 
 	/**
+	 * Sets a serialized copy of the passed ExecutionConfig. Further modification of the referenced ExecutionConfig
+	 * object will not affect this serialized copy.
+	 * @param executionConfig The ExecutionConfig to be serialized.
+	 */
+	public void setExecutionConfig(ExecutionConfig executionConfig) {
+		Preconditions.checkNotNull(executionConfig, "ExecutionConfig must not be null.");
+		try {
+			this.serializedExecutionConfig = new SerializedValue<>(executionConfig);
+		} catch (IOException e) {
+			throw new RuntimeException("Could not serialize ExecutionConfig.", e);
+		}
+	}
+
+	/**
 	 * Adds a new task vertex to the job graph if it is not already included.
 	 *
 	 * @param vertex

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 1ae0053..251673f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -219,11 +219,11 @@ public class Task implements Runnable {
 
 	private volatile long recoveryTs;
 
-	/** The job specific execution configuration (see {@link ExecutionConfig}). */
-	private final ExecutionConfig executionConfig;
+	/** Serialized version of the job specific execution configuration (see {@link ExecutionConfig}). */
+	private final SerializedValue<ExecutionConfig> serializedExecutionConfig;
 
-	/** Interval between two successive task cancellation attempts */
-	private final long taskCancellationInterval;
+	/** Initialized from the Flink configuration. May also be set at the ExecutionConfig */
+	private long taskCancellationInterval;
 
 	/**
 	 * <p><b>IMPORTANT:</b> This constructor may not start any work that would need to
@@ -253,7 +253,11 @@ public class Task implements Runnable {
 		this.nameOfInvokableClass = checkNotNull(tdd.getInvokableClassName());
 		this.operatorState = tdd.getOperatorState();
 		this.recoveryTs = tdd.getRecoveryTimestamp();
-		this.executionConfig = checkNotNull(tdd.getExecutionConfig());
+		this.serializedExecutionConfig = checkNotNull(tdd.getSerializedExecutionConfig());
+
+		this.taskCancellationInterval = jobConfiguration.getLong(
+			ConfigConstants.TASK_CANCELLATION_INTERVAL_MILLIS,
+			ConfigConstants.DEFAULT_TASK_CANCELLATION_INTERVAL_MILLIS);
 
 		this.memoryManager = checkNotNull(memManager);
 		this.ioManager = checkNotNull(ioManager);
@@ -271,15 +275,6 @@ public class Task implements Runnable {
 
 		this.executionListenerActors = new CopyOnWriteArrayList<ActorGateway>();
 
-		if (executionConfig.getTaskCancellationInterval() < 0) {
-			taskCancellationInterval = jobConfiguration.getLong(
-				ConfigConstants.TASK_CANCELLATION_INTERVAL_MILLIS,
-				ConfigConstants.DEFAULT_TASK_CANCELLATION_INTERVAL_MILLIS);
-		} else {
-			taskCancellationInterval = executionConfig.getTaskCancellationInterval();
-		}
-
-
 		// create the reader and writer structures
 
 		final String taskNameWithSubtaskAndId = taskNameWithSubtask + " (" + executionId + ')';
@@ -467,9 +462,14 @@ public class Task implements Runnable {
 			// first of all, get a user-code classloader
 			// this may involve downloading the job's JAR files and/or classes
 			LOG.info("Loading JAR files for task " + taskNameWithSubtask);
+
 			final ClassLoader userCodeClassLoader = createUserCodeClassloader(libraryCache);
+			final ExecutionConfig executionConfig = serializedExecutionConfig.deserializeValue(userCodeClassLoader);
 
-			executionConfig.deserializeUserCode(userCodeClassLoader);
+			if (executionConfig.getTaskCancellationInterval() >= 0) {
+				// override task cancellation interval from Flink config if set in ExecutionConfig
+				taskCancellationInterval = executionConfig.getTaskCancellationInterval();
+			}
 
 			// now load the task's invokable code
 			invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass);

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index d8b8a01..3c633f3 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -28,7 +28,6 @@ import akka.actor._
 import akka.pattern.ask
 
 import grizzled.slf4j.Logger
-import org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration
 
 import org.apache.flink.api.common.{ExecutionConfig, JobID}
 import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration}
@@ -46,7 +45,7 @@ import org.apache.flink.runtime.clusterframework.messages._
 import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
 import org.apache.flink.runtime.clusterframework.types.ResourceID
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
-import org.apache.flink.runtime.executiongraph.restart.{RestartStrategy, RestartStrategyFactory}
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
 import org.apache.flink.runtime.executiongraph.{ExecutionGraph, ExecutionJobVertex}
 import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceManager}
 import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator
@@ -1069,11 +1068,14 @@ class JobManager(
           throw new JobSubmissionException(jobId, "The given job is empty")
         }
 
-        val restartStrategy = Option(jobGraph.getExecutionConfig().getRestartStrategy())
-          .map(RestartStrategyFactory.createRestartStrategy(_)) match {
-            case Some(strategy) => strategy
-            case None => restartStrategyFactory.createRestartStrategy()
-          }
+        val restartStrategy =
+          Option(jobGraph.getSerializedExecutionConfig()
+            .deserializeValue(userCodeLoader)
+            .getRestartStrategy())
+              .map(RestartStrategyFactory.createRestartStrategy(_)) match {
+                case Some(strategy) => strategy
+                case None => restartStrategyFactory.createRestartStrategy()
+              }
 
         log.info(s"Using restart strategy $restartStrategy for $jobId.")
 
@@ -1088,7 +1090,7 @@ class JobManager(
               jobGraph.getJobID,
               jobGraph.getName,
               jobGraph.getJobConfiguration,
-              jobGraph.getExecutionConfig,
+              jobGraph.getSerializedExecutionConfig,
               timeout,
               restartStrategy,
               jobGraph.getUserJarBlobKeys,
@@ -1197,12 +1199,13 @@ class JobManager(
               new SimpleCheckpointStatsTracker(historySize, ackVertices)
             }
 
-          val jobParallelism = jobGraph.getExecutionConfig.getParallelism()
+          val jobParallelism = jobGraph.getSerializedExecutionConfig
+            .deserializeValue(userCodeLoader).getParallelism()
 
           val parallelism = if (jobParallelism == ExecutionConfig.PARALLELISM_AUTO_MAX) {
             numSlots
           } else {
-            jobGraph.getExecutionConfig.getParallelism
+            jobParallelism
           }
 
           executionGraph.enableSnapshotCheckpointing(

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
index 03ff83d..91a83b2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
@@ -61,7 +61,7 @@ public class CoordinatorShutdownTest {
 			vertex.setInvokableClass(Tasks.NoOpInvokable.class);
 			List<JobVertexID> vertexIdList = Collections.singletonList(vertex.getID());
 			
-			JobGraph testGraph = new JobGraph("test job", new ExecutionConfig(), vertex);
+			JobGraph testGraph = new JobGraph("test job", vertex);
 			testGraph.setSnapshotSettings(new JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList, 
 					5000, 60000, 0L, Integer.MAX_VALUE));
 			
@@ -113,7 +113,7 @@ public class CoordinatorShutdownTest {
 			vertex.setInvokableClass(Tasks.NoOpInvokable.class);
 			List<JobVertexID> vertexIdList = Collections.singletonList(vertex.getID());
 
-			JobGraph testGraph = new JobGraph("test job", new ExecutionConfig(), vertex);
+			JobGraph testGraph = new JobGraph("test job", vertex);
 			testGraph.setSnapshotSettings(new JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList,
 					5000, 60000, 0L, Integer.MAX_VALUE));
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index 965556f..a801348 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.checkpoint;
 
 import akka.actor.ActorSystem;
-import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.ExecutionConfigTest;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -50,7 +50,7 @@ public class ExecutionGraphCheckpointCoordinatorTest {
 			new JobID(),
 			"test",
 			new Configuration(),
-			new ExecutionConfig(),
+			ExecutionConfigTest.getSerializedConfig(),
 			new FiniteDuration(1, TimeUnit.DAYS),
 			new NoRestartStrategy(),
 			Collections.<BlobKey>emptyList(),

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorRecoveryITCase.java
index 865760e..cc1994a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorRecoveryITCase.java
@@ -95,7 +95,7 @@ public class JobClientActorRecoveryITCase extends TestLogger {
 		JobVertex blockingVertex = new JobVertex("Blocking Vertex");
 		blockingVertex.setInvokableClass(BlockingTask.class);
 		blockingVertex.setParallelism(1);
-		final JobGraph jobGraph = new JobGraph("Blocking Test Job", new ExecutionConfig(), blockingVertex);
+		final JobGraph jobGraph = new JobGraph("Blocking Test Job", blockingVertex);
 		final Promise<JobExecutionResult> promise = new scala.concurrent.impl.Promise.DefaultPromise<>();
 
 		Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow();

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
index ee1fd60..073164c0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
@@ -48,7 +48,7 @@ import java.util.concurrent.TimeUnit;
 public class JobClientActorTest extends TestLogger {
 
 	private static ActorSystem system;
-	private static JobGraph testJobGraph = new JobGraph("Test Job", new ExecutionConfig());
+	private static JobGraph testJobGraph = new JobGraph("Test Job");
 
 	@BeforeClass
 	public static void setup() {

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
index 63e62bf..36744a9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.ExecutionConfigTest;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -35,6 +36,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.operators.BatchTask;
 import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.util.SerializedValue;
 import org.junit.Test;
 
 public class TaskDeploymentDescriptorTest {
@@ -55,7 +57,7 @@ public class TaskDeploymentDescriptorTest {
 			final List<InputGateDeploymentDescriptor> inputGates = new ArrayList<InputGateDeploymentDescriptor>(0);
 			final List<BlobKey> requiredJars = new ArrayList<BlobKey>(0);
 			final List<URL> requiredClasspaths = new ArrayList<URL>(0);
-			final ExecutionConfig executionConfig = new ExecutionConfig();
+			final SerializedValue<ExecutionConfig> executionConfig = ExecutionConfigTest.getSerializedConfig();
 
 			final TaskDeploymentDescriptor orig = new TaskDeploymentDescriptor(jobID, vertexID, execId,
 				executionConfig, taskName, indexInSubtaskGroup, currentNumberOfSubtasks, attemptNumber,
@@ -78,9 +80,7 @@ public class TaskDeploymentDescriptorTest {
 			assertEquals(orig.getAttemptNumber(), copy.getAttemptNumber());
 			assertEquals(orig.getProducedPartitions(), copy.getProducedPartitions());
 			assertEquals(orig.getInputGates(), copy.getInputGates());
-			// load serialized values in ExecutionConfig
-			copy.getExecutionConfig().deserializeUserCode(getClass().getClassLoader());
-			assertEquals(orig.getExecutionConfig(), copy.getExecutionConfig());
+			assertEquals(orig.getSerializedExecutionConfig(), copy.getSerializedExecutionConfig());
 
 			assertEquals(orig.getRequiredJarFiles(), copy.getRequiredJarFiles());
 			assertEquals(orig.getRequiredClasspaths(), copy.getRequiredClasspaths());

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
index d845d01..8eebe66 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
@@ -26,7 +26,7 @@ import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.ExecutionConfigTest;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -107,7 +107,7 @@ public class ExecutionGraphConstructionTest {
 			jobId, 
 			jobName, 
 			cfg,
-			new ExecutionConfig(),
+			ExecutionConfigTest.getSerializedConfig(),
 			AkkaUtils.getDefaultTimeout(),
 			new NoRestartStrategy());
 		try {
@@ -152,7 +152,7 @@ public class ExecutionGraphConstructionTest {
 			jobId, 
 			jobName, 
 			cfg, 
-			new ExecutionConfig(),
+			ExecutionConfigTest.getSerializedConfig(),
 			AkkaUtils.getDefaultTimeout(),
 			new NoRestartStrategy());
 		try {
@@ -220,7 +220,7 @@ public class ExecutionGraphConstructionTest {
 			jobId, 
 			jobName, 
 			cfg, 
-			new ExecutionConfig(),
+			ExecutionConfigTest.getSerializedConfig(),
 			AkkaUtils.getDefaultTimeout(),
 			new NoRestartStrategy());
 		try {
@@ -475,7 +475,7 @@ public class ExecutionGraphConstructionTest {
 			jobId, 
 			jobName, 
 			cfg, 
-			new ExecutionConfig(),
+			ExecutionConfigTest.getSerializedConfig(),
 			AkkaUtils.getDefaultTimeout(),
 			new NoRestartStrategy());
 		try {
@@ -532,7 +532,7 @@ public class ExecutionGraphConstructionTest {
 			jobId, 
 			jobName, 
 			cfg, 
-			new ExecutionConfig(),
+			ExecutionConfigTest.getSerializedConfig(),
 			AkkaUtils.getDefaultTimeout(),
 			new NoRestartStrategy());
 		try {
@@ -594,7 +594,7 @@ public class ExecutionGraphConstructionTest {
 				jobId, 
 				jobName, 
 				cfg, 
-				new ExecutionConfig(),
+				ExecutionConfigTest.getSerializedConfig(),
 				AkkaUtils.getDefaultTimeout(),
 				new NoRestartStrategy());
 			try {
@@ -640,7 +640,7 @@ public class ExecutionGraphConstructionTest {
 				jobId, 
 				jobName,
 				cfg, 
-				new ExecutionConfig(),
+				ExecutionConfigTest.getSerializedConfig(),
 				AkkaUtils.getDefaultTimeout(),
 				new NoRestartStrategy());
 
@@ -705,14 +705,14 @@ public class ExecutionGraphConstructionTest {
 			JobVertex v8 = new JobVertex("vertex8");
 			v8.setParallelism(2);
 
-			JobGraph jg = new JobGraph(jobId, jobName, new ExecutionConfig(), v1, v2, v3, v4, v5, v6, v7, v8);
+			JobGraph jg = new JobGraph(jobId, jobName, v1, v2, v3, v4, v5, v6, v7, v8);
 			
 			ExecutionGraph eg = new ExecutionGraph(
 				TestingUtils.defaultExecutionContext(), 
 				jobId, 
 				jobName, 
 				cfg, 
-				new ExecutionConfig(),
+				ExecutionConfigTest.getSerializedConfig(),
 				AkkaUtils.getDefaultTimeout(),
 				new NoRestartStrategy());
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 7a9cee7..d126acb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -30,7 +30,7 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.ExecutionConfigTest;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -84,8 +84,8 @@ public class ExecutionGraphDeploymentTest {
 				TestingUtils.defaultExecutionContext(), 
 				jobId, 
 				"some job", 
-				new Configuration(), 
-				new ExecutionConfig(),
+				new Configuration(),
+				ExecutionConfigTest.getSerializedConfig(),
 				AkkaUtils.getDefaultTimeout(),
 				new NoRestartStrategy());
 
@@ -289,7 +289,7 @@ public class ExecutionGraphDeploymentTest {
 			jobId, 
 			"some job", 
 			new Configuration(),
-			new ExecutionConfig(),
+			ExecutionConfigTest.getSerializedConfig(),
 			AkkaUtils.getDefaultTimeout(),
 			new NoRestartStrategy());
 		
@@ -332,4 +332,4 @@ public class ExecutionGraphDeploymentTest {
 			throw new Exception();
 		}
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 0837927..01cca5c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.ExecutionConfigTest;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.configuration.Configuration;
@@ -73,14 +74,14 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		sender.setInvokableClass(Tasks.NoOpInvokable.class);
 		sender.setParallelism(NUM_TASKS);
 
-		JobGraph jobGraph = new JobGraph("Pointwise job", new ExecutionConfig(), sender);
+		JobGraph jobGraph = new JobGraph("Pointwise job", sender);
 
 		ExecutionGraph eg = new ExecutionGraph(
 				TestingUtils.defaultExecutionContext(),
 				new JobID(),
 				"test job",
 				new Configuration(),
-				new ExecutionConfig(),
+				ExecutionConfigTest.getSerializedConfig(),
 				AkkaUtils.getDefaultTimeout(),
 				new NoRestartStrategy());
 		eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
@@ -129,13 +130,13 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		groupVertex.setStrictlyCoLocatedWith(groupVertex2);
 		
 		//initiate and schedule job
-		JobGraph jobGraph = new JobGraph("Pointwise job", new ExecutionConfig(), groupVertex, groupVertex2);
+		JobGraph jobGraph = new JobGraph("Pointwise job", groupVertex, groupVertex2);
 		ExecutionGraph eg = new ExecutionGraph(
 			TestingUtils.defaultExecutionContext(),
 			new JobID(),
 			"test job",
 			new Configuration(),
-			new ExecutionConfig(),
+			ExecutionConfigTest.getSerializedConfig(),
 			AkkaUtils.getDefaultTimeout(),
 			new FixedDelayRestartStrategy(1, 0L));
 		eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
@@ -184,14 +185,14 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		sender.setInvokableClass(Tasks.NoOpInvokable.class);
 		sender.setParallelism(NUM_TASKS);
 
-		JobGraph jobGraph = new JobGraph("Pointwise job", new ExecutionConfig(), sender);
+		JobGraph jobGraph = new JobGraph("Pointwise job", sender);
 
 		ExecutionGraph eg = new ExecutionGraph(
 				TestingUtils.defaultExecutionContext(),
 				new JobID(),
 				"Test job",
 				new Configuration(),
-				new ExecutionConfig(),
+				ExecutionConfigTest.getSerializedConfig(),
 				AkkaUtils.getDefaultTimeout(),
 				new FixedDelayRestartStrategy(1, 1000));
 		eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
@@ -220,7 +221,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 				new JobID(),
 				"TestJob",
 				new Configuration(),
-				new ExecutionConfig(),
+				ExecutionConfigTest.getSerializedConfig(),
 				AkkaUtils.getDefaultTimeout(),
 				// We want to manually control the restart and delay
 				new FixedDelayRestartStrategy(Integer.MAX_VALUE, Long.MAX_VALUE));
@@ -229,7 +230,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
 		jobVertex.setParallelism(NUM_TASKS);
 
-		JobGraph jobGraph = new JobGraph("TestJob", new ExecutionConfig(), jobVertex);
+		JobGraph jobGraph = new JobGraph("TestJob", jobVertex);
 
 		executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
 
@@ -279,7 +280,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 				new JobID(),
 				"TestJob",
 				new Configuration(),
-				new ExecutionConfig(),
+				ExecutionConfigTest.getSerializedConfig(),
 				AkkaUtils.getDefaultTimeout(),
 				// We want to manually control the restart and delay
 				new FixedDelayRestartStrategy(Integer.MAX_VALUE, Long.MAX_VALUE));
@@ -295,7 +296,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
 		jobVertex.setParallelism(NUM_TASKS);
 
-		JobGraph jobGraph = new JobGraph("TestJob", new ExecutionConfig(), jobVertex);
+		JobGraph jobGraph = new JobGraph("TestJob", jobVertex);
 
 		executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
 
@@ -355,14 +356,14 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		sender.setInvokableClass(Tasks.NoOpInvokable.class);
 		sender.setParallelism(NUM_TASKS);
 
-		JobGraph jobGraph = new JobGraph("Pointwise job", new ExecutionConfig(), sender);
+		JobGraph jobGraph = new JobGraph("Pointwise job", sender);
 
 		ExecutionGraph eg = spy(new ExecutionGraph(
 			TestingUtils.defaultExecutionContext(),
 			new JobID(),
 			"Test job",
 			new Configuration(),
-			new ExecutionConfig(),
+			ExecutionConfigTest.getSerializedConfig(),
 			AkkaUtils.getDefaultTimeout(),
 			new FixedDelayRestartStrategy(1, 1000)));
 
@@ -426,14 +427,14 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		receiver.setInvokableClass(Tasks.NoOpInvokable.class);
 		receiver.setParallelism(1);
 
-		JobGraph jobGraph = new JobGraph("Pointwise job", new ExecutionConfig(), sender, receiver);
+		JobGraph jobGraph = new JobGraph("Pointwise job", sender, receiver);
 
 		ExecutionGraph eg = new ExecutionGraph(
 			TestingUtils.defaultExecutionContext(),
 			new JobID(),
 			"test job",
 			new Configuration(),
-			new ExecutionConfig(),
+			ExecutionConfigTest.getSerializedConfig(),
 			AkkaUtils.getDefaultTimeout(),
 			new FixedDelayRestartStrategy(1, 1000));
 
@@ -518,16 +519,18 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		vertex.setInvokableClass(Tasks.NoOpInvokable.class);
 		vertex.setParallelism(1);
 
-		JobGraph jobGraph = new JobGraph("Test Job", new ExecutionConfig(), vertex);
-		jobGraph.getExecutionConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(
-				Integer.MAX_VALUE, Integer.MAX_VALUE));
+		ExecutionConfig executionConfig = new ExecutionConfig();
+		executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(
+			Integer.MAX_VALUE, Integer.MAX_VALUE));
+		JobGraph jobGraph = new JobGraph("Test Job", vertex);
+		jobGraph.setExecutionConfig(executionConfig);
 
 		ExecutionGraph eg = new ExecutionGraph(
 				TestingUtils.defaultExecutionContext(),
 				new JobID(),
 				"test job",
 				new Configuration(),
-				new ExecutionConfig(),
+				ExecutionConfigTest.getSerializedConfig(),
 				AkkaUtils.getDefaultTimeout(),
 				new FixedDelayRestartStrategy(1, 1000000));
 
@@ -570,16 +573,18 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		vertex.setInvokableClass(Tasks.NoOpInvokable.class);
 		vertex.setParallelism(1);
 
-		JobGraph jobGraph = new JobGraph("Test Job", new ExecutionConfig(), vertex);
-		jobGraph.getExecutionConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(
-				Integer.MAX_VALUE, Integer.MAX_VALUE));
+		ExecutionConfig executionConfig = new ExecutionConfig();
+		executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(
+			Integer.MAX_VALUE, Integer.MAX_VALUE));
+		JobGraph jobGraph = new JobGraph("Test Job", vertex);
+		jobGraph.setExecutionConfig(executionConfig);
 
 		ExecutionGraph eg = new ExecutionGraph(
 				TestingUtils.defaultExecutionContext(),
 				new JobID(),
 				"test job",
 				new Configuration(),
-				new ExecutionConfig(),
+				ExecutionConfigTest.getSerializedConfig(),
 				AkkaUtils.getDefaultTimeout(),
 				new FixedDelayRestartStrategy(1, 1000000));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
index d1bb680..8b04fa3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
@@ -22,7 +22,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
-import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.ExecutionConfigTest;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.StoppingException;
@@ -128,7 +128,7 @@ public class ExecutionGraphSignalsTest {
 			jobId,
 			jobName,
 			cfg,
-			new ExecutionConfig(),
+			ExecutionConfigTest.getSerializedConfig(),
 			AkkaUtils.getDefaultTimeout(),
 			new NoRestartStrategy());
 		eg.attachJobGraph(ordered);

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 6659b5a..92a7402 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -22,10 +22,11 @@ import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 
+import java.io.IOException;
 import java.lang.reflect.Field;
 import java.net.InetAddress;
 
-import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.ExecutionConfigTest;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -176,7 +177,7 @@ public class ExecutionGraphTestUtils {
 			new JobID(), 
 			"test job", 
 			new Configuration(), 
-			new ExecutionConfig(),
+			ExecutionConfigTest.getSerializedConfig(),
 			AkkaUtils.getDefaultTimeout(),
 			new NoRestartStrategy());
 
@@ -197,7 +198,7 @@ public class ExecutionGraphTestUtils {
 		return ejv;
 	}
 	
-	public static ExecutionJobVertex getExecutionVertex(JobVertexID id) throws JobException {
+	public static ExecutionJobVertex getExecutionVertex(JobVertexID id) throws JobException, IOException {
 		return getExecutionVertex(id, TestingUtils.defaultExecutionContext());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
index 1ff90e1..9e4aa6d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
@@ -1,88 +1,88 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph;
-
-import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.*;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.mock;
-
-import java.util.Arrays;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
-import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.junit.Test;
-
-public class ExecutionStateProgressTest {
-
-	@Test
-	public void testAccumulatedStateFinished() {
-		try {
-			final JobID jid = new JobID();
-			final JobVertexID vid = new JobVertexID();
-
-			JobVertex ajv = new JobVertex("TestVertex", vid);
-			ajv.setParallelism(3);
-			ajv.setInvokableClass(mock(AbstractInvokable.class).getClass());
-
-			ExecutionGraph graph = new ExecutionGraph(
-				TestingUtils.defaultExecutionContext(), 
-				jid, 
-				"test job", 
-				new Configuration(), 
-                new ExecutionConfig(),
-				AkkaUtils.getDefaultTimeout(),
-				new NoRestartStrategy());
-			graph.attachJobGraph(Arrays.asList(ajv));
-
-			setGraphStatus(graph, JobStatus.RUNNING);
-
-			ExecutionJobVertex ejv = graph.getJobVertex(vid);
-
-			// mock resources and mock taskmanager
-			for (ExecutionVertex ee : ejv.getTaskVertices()) {
-				SimpleSlot slot = getInstance(
-						new SimpleActorGateway(
-								TestingUtils.defaultExecutionContext())
-				).allocateSimpleSlot(jid);
-				ee.deployToSlot(slot);
-			}
-
-			// finish all
-			for (ExecutionVertex ee : ejv.getTaskVertices()) {
-				ee.executionFinished();
-			}
-
-			assertTrue(ejv.isInFinalState());
-			assertEquals(JobStatus.FINISHED, graph.getState());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-}
\ No newline at end of file
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.*;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+
+import java.util.Arrays;
+
+import org.apache.flink.api.common.ExecutionConfigTest;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.Test;
+
+public class ExecutionStateProgressTest {
+
+	@Test
+	public void testAccumulatedStateFinished() {
+		try {
+			final JobID jid = new JobID();
+			final JobVertexID vid = new JobVertexID();
+
+			JobVertex ajv = new JobVertex("TestVertex", vid);
+			ajv.setParallelism(3);
+			ajv.setInvokableClass(mock(AbstractInvokable.class).getClass());
+
+			ExecutionGraph graph = new ExecutionGraph(
+				TestingUtils.defaultExecutionContext(), 
+				jid, 
+				"test job", 
+				new Configuration(),
+				ExecutionConfigTest.getSerializedConfig(),
+				AkkaUtils.getDefaultTimeout(),
+				new NoRestartStrategy());
+			graph.attachJobGraph(Arrays.asList(ajv));
+
+			setGraphStatus(graph, JobStatus.RUNNING);
+
+			ExecutionJobVertex ejv = graph.getJobVertex(vid);
+
+			// mock resources and mock taskmanager
+			for (ExecutionVertex ee : ejv.getTaskVertices()) {
+				SimpleSlot slot = getInstance(
+						new SimpleActorGateway(
+								TestingUtils.defaultExecutionContext())
+				).allocateSimpleSlot(jid);
+				ee.deployToSlot(slot);
+			}
+
+			// finish all
+			for (ExecutionVertex ee : ejv.getTaskVertices()) {
+				ee.executionFinished();
+			}
+
+			assertTrue(ejv.isInFinalState());
+			assertEquals(JobStatus.FINISHED, graph.getState());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
index a4c86e3..d7ce0ba 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
@@ -25,7 +25,7 @@ import static org.mockito.Mockito.when;
 import java.net.InetAddress;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.ExecutionConfigTest;
 import org.apache.flink.api.common.io.StrictlyLocalAssignment;
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.core.io.InputSplitSource;
@@ -266,14 +266,14 @@ public class LocalInputSplitsTest {
 			vertex.setInvokableClass(DummyInvokable.class);
 			vertex.setInputSplitSource(new TestInputSplitSource(splits));
 			
-			JobGraph jobGraph = new JobGraph("test job", new ExecutionConfig(), vertex);
+			JobGraph jobGraph = new JobGraph("test job", vertex);
 			
 			ExecutionGraph eg = new ExecutionGraph(
 				TestingUtils.defaultExecutionContext(), 
 				jobGraph.getJobID(),
 				jobGraph.getName(),  
 				jobGraph.getJobConfiguration(),
-				new ExecutionConfig(),
+				ExecutionConfigTest.getSerializedConfig(),
 				TIMEOUT,
 				new NoRestartStrategy());
 			
@@ -331,14 +331,14 @@ public class LocalInputSplitsTest {
 		vertex.setInvokableClass(DummyInvokable.class);
 		vertex.setInputSplitSource(new TestInputSplitSource(splits));
 		
-		JobGraph jobGraph = new JobGraph("test job", new ExecutionConfig(), vertex);
+		JobGraph jobGraph = new JobGraph("test job", vertex);
 		
 		ExecutionGraph eg = new ExecutionGraph(
 			TestingUtils.defaultExecutionContext(),
 			jobGraph.getJobID(),
 			jobGraph.getName(),  
 			jobGraph.getJobConfiguration(),
-			new ExecutionConfig(),
+			ExecutionConfigTest.getSerializedConfig(),
 			TIMEOUT,
 			new NoRestartStrategy());
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
index cbeeded..1b369db 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
@@ -22,7 +22,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.ExecutionConfigTest;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -64,7 +64,7 @@ public class PointwisePatternTest {
 			jobId, 
 			jobName, 
 			cfg,
-			new ExecutionConfig(),
+			ExecutionConfigTest.getSerializedConfig(),
 			AkkaUtils.getDefaultTimeout(),
 			new NoRestartStrategy());
 		try {
@@ -105,8 +105,8 @@ public class PointwisePatternTest {
 			TestingUtils.defaultExecutionContext(), 
 			jobId, 
 			jobName, 
-			cfg, 
-			new ExecutionConfig(),
+			cfg,
+			ExecutionConfigTest.getSerializedConfig(),
 			AkkaUtils.getDefaultTimeout(),
 			new NoRestartStrategy());
 		try {
@@ -148,8 +148,8 @@ public class PointwisePatternTest {
 			TestingUtils.defaultExecutionContext(), 
 			jobId, 
 			jobName, 
-			cfg, 
-			new ExecutionConfig(),
+			cfg,
+			ExecutionConfigTest.getSerializedConfig(),
 			AkkaUtils.getDefaultTimeout(),
 			new NoRestartStrategy());
 		try {
@@ -192,8 +192,8 @@ public class PointwisePatternTest {
 			TestingUtils.defaultExecutionContext(), 
 			jobId, 
 			jobName,
-			cfg, 
-			new ExecutionConfig(),
+			cfg,
+			ExecutionConfigTest.getSerializedConfig(),
 			AkkaUtils.getDefaultTimeout(),
 			new NoRestartStrategy());
 		try {
@@ -234,8 +234,8 @@ public class PointwisePatternTest {
 			TestingUtils.defaultExecutionContext(), 
 			jobId, 
 			jobName, 
-			cfg, 
-			new ExecutionConfig(),
+			cfg,
+			ExecutionConfigTest.getSerializedConfig(),
 			AkkaUtils.getDefaultTimeout(),
 			new NoRestartStrategy());
 		try {
@@ -296,8 +296,8 @@ public class PointwisePatternTest {
 			TestingUtils.defaultExecutionContext(), 
 			jobId, 
 			jobName, 
-			cfg, 
-			new ExecutionConfig(),
+			cfg,
+			ExecutionConfigTest.getSerializedConfig(),
 			AkkaUtils.getDefaultTimeout(),
 			new NoRestartStrategy());
 		try {
@@ -349,8 +349,8 @@ public class PointwisePatternTest {
 			TestingUtils.defaultExecutionContext(), 
 			jobId, 
 			jobName, 
-			cfg, 
-			new ExecutionConfig(),
+			cfg,
+			ExecutionConfigTest.getSerializedConfig(),
 			AkkaUtils.getDefaultTimeout(),
 			new NoRestartStrategy());
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
index a28fb49..8bc474b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.ExecutionConfigTest;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -188,7 +188,7 @@ public class TerminalStateDeadlockTest {
 				jobId,
 				"test graph",
 				EMPTY_CONFIG,
-				new ExecutionConfig(),
+				ExecutionConfigTest.getSerializedConfig(),
 				TIMEOUT,
 				new FixedDelayRestartStrategy(1, 0));
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
index d866b2f..c483f41 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
@@ -26,7 +26,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.ExecutionConfigTest;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.instance.DummyActorGateway;
@@ -76,14 +76,14 @@ public class VertexLocationConstraintTest {
 			JobVertex jobVertex = new JobVertex("test vertex", new JobVertexID());
 			jobVertex.setInvokableClass(DummyInvokable.class);
 			jobVertex.setParallelism(2);
-			JobGraph jg = new JobGraph("test job", new ExecutionConfig(), jobVertex);
+			JobGraph jg = new JobGraph("test job", jobVertex);
 			
 			ExecutionGraph eg = new ExecutionGraph(
 					TestingUtils.defaultExecutionContext(),
 					jg.getJobID(),
 					jg.getName(),
 					jg.getJobConfiguration(),
-					new ExecutionConfig(),
+					ExecutionConfigTest.getSerializedConfig(),
 					timeout,
 					new NoRestartStrategy());
 			eg.attachJobGraph(Collections.singletonList(jobVertex));
@@ -149,14 +149,14 @@ public class VertexLocationConstraintTest {
 			JobVertex jobVertex = new JobVertex("test vertex", new JobVertexID());
 			jobVertex.setInvokableClass(DummyInvokable.class);
 			jobVertex.setParallelism(2);
-			JobGraph jg = new JobGraph("test job", new ExecutionConfig(), jobVertex);
+			JobGraph jg = new JobGraph("test job", jobVertex);
 			
 			ExecutionGraph eg = new ExecutionGraph(
 					TestingUtils.defaultExecutionContext(),
 					jg.getJobID(),
 					jg.getName(),
 					jg.getJobConfiguration(),
-					new ExecutionConfig(),
+					ExecutionConfigTest.getSerializedConfig(),
 					timeout,
 					new NoRestartStrategy());
 			eg.attachJobGraph(Collections.singletonList(jobVertex));
@@ -226,14 +226,14 @@ public class VertexLocationConstraintTest {
 			jobVertex1.setSlotSharingGroup(sharingGroup);
 			jobVertex2.setSlotSharingGroup(sharingGroup);
 			
-			JobGraph jg = new JobGraph("test job", new ExecutionConfig(), jobVertex1, jobVertex2);
+			JobGraph jg = new JobGraph("test job", jobVertex1, jobVertex2);
 			
 			ExecutionGraph eg = new ExecutionGraph(
 					TestingUtils.defaultExecutionContext(),
 					jg.getJobID(),
 					jg.getName(),
 					jg.getJobConfiguration(),
-					new ExecutionConfig(),
+					ExecutionConfigTest.getSerializedConfig(),
 					timeout,
 					new NoRestartStrategy());
 			eg.attachJobGraph(Arrays.asList(jobVertex1, jobVertex2));
@@ -294,14 +294,14 @@ public class VertexLocationConstraintTest {
 			JobVertex jobVertex = new JobVertex("test vertex", new JobVertexID());
 			jobVertex.setInvokableClass(DummyInvokable.class);
 			jobVertex.setParallelism(1);
-			JobGraph jg = new JobGraph("test job", new ExecutionConfig(), jobVertex);
+			JobGraph jg = new JobGraph("test job", jobVertex);
 			
 			ExecutionGraph eg = new ExecutionGraph(
 					TestingUtils.defaultExecutionContext(),
 					jg.getJobID(),
 					jg.getName(),
 					jg.getJobConfiguration(),
-					new ExecutionConfig(),
+					ExecutionConfigTest.getSerializedConfig(),
 					timeout,
 					new NoRestartStrategy());
 			eg.attachJobGraph(Collections.singletonList(jobVertex));
@@ -360,7 +360,7 @@ public class VertexLocationConstraintTest {
 			jobVertex1.setParallelism(1);
 			jobVertex2.setParallelism(1);
 			
-			JobGraph jg = new JobGraph("test job", new ExecutionConfig(), jobVertex1, jobVertex2);
+			JobGraph jg = new JobGraph("test job", jobVertex1, jobVertex2);
 			
 			SlotSharingGroup sharingGroup = new SlotSharingGroup();
 			jobVertex1.setSlotSharingGroup(sharingGroup);
@@ -371,7 +371,7 @@ public class VertexLocationConstraintTest {
 					jg.getJobID(),
 					jg.getName(),
 					jg.getJobConfiguration(),
-					new ExecutionConfig(),
+					ExecutionConfigTest.getSerializedConfig(),
 					timeout,
 					new NoRestartStrategy());
 			eg.attachJobGraph(Arrays.asList(jobVertex1, jobVertex2));
@@ -404,14 +404,14 @@ public class VertexLocationConstraintTest {
 	public void testArchivingClearsFields() {
 		try {
 			JobVertex vertex = new JobVertex("test vertex", new JobVertexID());
-			JobGraph jg = new JobGraph("test job", new ExecutionConfig(), vertex);
+			JobGraph jg = new JobGraph("test job", vertex);
 			
 			ExecutionGraph eg = new ExecutionGraph(
 					TestingUtils.defaultExecutionContext(),
 					jg.getJobID(),
 					jg.getName(),
 					jg.getJobConfiguration(),
-					new ExecutionConfig(),
+					ExecutionConfigTest.getSerializedConfig(),
 					timeout,
 					new NoRestartStrategy());
 			eg.attachJobGraph(Collections.singletonList(vertex));

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
index 5110249..7a23e26 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
@@ -24,7 +24,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
-import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.ExecutionConfigTest;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
@@ -76,7 +76,7 @@ public class VertexSlotSharingTest {
 					new JobID(),
 					"test job",
 					new Configuration(),
-					new ExecutionConfig(),
+					ExecutionConfigTest.getSerializedConfig(),
 					AkkaUtils.getDefaultTimeout(),
 					new NoRestartStrategy());
 			eg.attachJobGraph(vertices);

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
index 317eed7..af8aa69 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
@@ -89,8 +89,7 @@ public class PartialConsumePipelinedResultTest {
 		receiver.connectNewDataSetAsInput(
 				sender, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
 
-		final JobGraph jobGraph = new JobGraph(
-				"Partial Consume of Pipelined Result", new ExecutionConfig(), sender, receiver);
+		final JobGraph jobGraph = new JobGraph("Partial Consume of Pipelined Result", sender, receiver);
 
 		final SlotSharingGroup slotSharingGroup = new SlotSharingGroup(
 				sender.getID(), receiver.getID());

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
index 68b05b2..74f1adf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
@@ -22,7 +22,6 @@ import static org.junit.Assert.*;
 
 import java.util.List;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.junit.Test;
@@ -32,7 +31,7 @@ public class JobGraphTest {
 	@Test
 	public void testSerialization() {
 		try {
-			JobGraph jg = new JobGraph("The graph", new ExecutionConfig());
+			JobGraph jg = new JobGraph("The graph");
 			
 			// add some configuration values
 			{
@@ -91,7 +90,7 @@ public class JobGraphTest {
 			intermediate2.connectNewDataSetAsInput(intermediate1, DistributionPattern.POINTWISE);
 			intermediate1.connectNewDataSetAsInput(source2, DistributionPattern.POINTWISE);
 
-			JobGraph graph = new JobGraph("TestGraph", new ExecutionConfig(),
+			JobGraph graph = new JobGraph("TestGraph",
 				source1, source2, intermediate1, intermediate2, target1, target2);
 			List<JobVertex> sorted = graph.getVerticesSortedTopologicallyFromSources();
 			
@@ -136,7 +135,7 @@ public class JobGraphTest {
 			
 			l13.connectNewDataSetAsInput(source2, DistributionPattern.POINTWISE);
 
-			JobGraph graph = new JobGraph("TestGraph", new ExecutionConfig(),
+			JobGraph graph = new JobGraph("TestGraph",
 				source1, source2, root, l11, l13, l12, l2);
 			List<JobVertex> sorted = graph.getVerticesSortedTopologicallyFromSources();
 			
@@ -183,7 +182,7 @@ public class JobGraphTest {
 			op2.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE);
 			op3.connectNewDataSetAsInput(op2, DistributionPattern.POINTWISE);
 
-			JobGraph graph = new JobGraph("TestGraph", new ExecutionConfig(), source, op1, op2, op3);
+			JobGraph graph = new JobGraph("TestGraph", source, op1, op2, op3);
 			List<JobVertex> sorted = graph.getVerticesSortedTopologicallyFromSources();
 			
 			assertEquals(4,  sorted.size());
@@ -212,7 +211,7 @@ public class JobGraphTest {
 			v3.connectNewDataSetAsInput(v2, DistributionPattern.POINTWISE);
 			v4.connectNewDataSetAsInput(v3, DistributionPattern.POINTWISE);
 
-			JobGraph jg = new JobGraph("Cyclic Graph", new ExecutionConfig(), v1, v2, v3, v4);
+			JobGraph jg = new JobGraph("Cyclic Graph", v1, v2, v3, v4);
 			try {
 				jg.getVerticesSortedTopologicallyFromSources();
 				fail("Failed to raise error on topologically sorting cyclic graph.");
@@ -244,7 +243,7 @@ public class JobGraphTest {
 			v4.connectNewDataSetAsInput(v3, DistributionPattern.POINTWISE);
 			target.connectNewDataSetAsInput(v3, DistributionPattern.POINTWISE);
 
-			JobGraph jg = new JobGraph("Cyclic Graph", new ExecutionConfig(), v1, v2, v3, v4, source, target);
+			JobGraph jg = new JobGraph("Cyclic Graph", v1, v2, v3, v4, source, target);
 			try {
 				jg.getVerticesSortedTopologicallyFromSources();
 				fail("Failed to raise error on topologically sorting cyclic graph.");

http://git-wip-us.apache.org/repos/asf/flink/blob/48b469ad/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java
index 612f64f..d1d5f03 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java
@@ -67,7 +67,7 @@ public class JsonGeneratorTest {
 			sink1.connectNewDataSetAsInput(join2, DistributionPattern.POINTWISE);
 			sink2.connectNewDataSetAsInput(join1, DistributionPattern.ALL_TO_ALL);
 
-			JobGraph jg = new JobGraph("my job", new ExecutionConfig(), source1, source2, source3,
+			JobGraph jg = new JobGraph("my job", source1, source2, source3,
 					intermediate1, intermediate2, join1, join2, sink1, sink2);
 			
 			String plan = JsonPlanGenerator.generatePlan(jg);


Mime
View raw message