flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m..@apache.org
Subject [1/2] flink git commit: [FLINK-4485] close and remove user class loader after job completion
Date Sat, 24 Sep 2016 11:48:41 GMT
Repository: flink
Updated Branches:
  refs/heads/master 40c978b04 -> cd43dd59e


[FLINK-4485] close and remove user class loader after job completion

Keeping the user class loader around after job completion may lead to
excessive temp space usage because all user jars are kept until the
class loader is garbage collected. Tests showed that garbage collection
can be delayed for a long time after the class loader is not referenced
anymore. Note that for the class loader to not be referenced anymore,
its job has to be removed from the archive.

The fastest way to minimize temp space usage is to close and remove the
URLClassloader after job completion. This requires us to keep a
serializable copy of all data which needs the user class loader after
job completion, e.g. to display data on the web interface.

This closes #2499


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cd43dd59
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cd43dd59
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cd43dd59

Branch: refs/heads/master
Commit: cd43dd59e248766627c35f90038b2202ed9e52dc
Parents: 1e4b7eb
Author: Maximilian Michels <mxm@apache.org>
Authored: Wed Sep 14 11:00:58 2016 +0200
Committer: Maximilian Michels <mxm@apache.org>
Committed: Sat Sep 24 13:45:50 2016 +0200

----------------------------------------------------------------------
 .../flink/client/program/JobWithJars.java       |  4 +-
 .../webmonitor/handlers/JobConfigHandler.java   | 47 +++++-------
 .../apache/flink/runtime/client/JobClient.java  |  4 +-
 .../librarycache/BlobLibraryCacheManager.java   | 44 ++++++------
 .../librarycache/FlinkUserCodeClassLoader.java  | 35 +++++++++
 .../runtime/executiongraph/ExecutionGraph.java  | 34 +++++++++
 .../archive/ExecutionConfigSummary.java         | 75 ++++++++++++++++++++
 .../apache/flink/runtime/taskmanager/Task.java  |  3 +-
 .../flink/test/web/WebFrontendITCase.java       |  2 +-
 9 files changed, 193 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cd43dd59/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
b/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
index ef02527..d5a3014 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
@@ -22,12 +22,12 @@ import java.io.File;
 import java.io.IOException;
 import java.net.URISyntaxException;
 import java.net.URL;
-import java.net.URLClassLoader;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
 import org.apache.flink.api.common.Plan;
+import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader;
 
 /**
  * A JobWithJars is a Flink dataflow plan, together with a bunch of JAR files that contain
@@ -134,6 +134,6 @@ public class JobWithJars {
 		for (int i = 0; i < classpaths.size(); i++) {
 			urls[i + jars.size()] = classpaths.get(i);
 		}
-		return new URLClassLoader(urls, parent);
+		return new FlinkUserCodeClassLoader(urls, parent);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cd43dd59/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
index cd63630..75389b1 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
@@ -22,8 +22,8 @@ import java.io.StringWriter;
 import java.util.Map;
 
 import com.fasterxml.jackson.core.JsonGenerator;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.archive.ExecutionConfigSummary;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 
 /**
@@ -45,37 +45,28 @@ public class JobConfigHandler extends AbstractExecutionGraphRequestHandler
{
 		gen.writeStringField("jid", graph.getJobID().toString());
 		gen.writeStringField("name", graph.getJobName());
 
-		ExecutionConfig ec;
-		try {
-			ec = graph.getSerializedExecutionConfig().deserializeValue(graph.getUserClassLoader());
-		} catch (Exception e) {
-			throw new RuntimeException("Couldn't deserialize ExecutionConfig.", e);
-		}
+		final ExecutionConfigSummary summary = graph.getExecutionConfigSummary();
 
-		if (ec != null) {
+		if (summary != null) {
 			gen.writeObjectFieldStart("execution-config");
-			
-			gen.writeStringField("execution-mode", ec.getExecutionMode().name());
-
-			final String restartStrategyDescription = ec.getRestartStrategy() != null ? ec.getRestartStrategy().getDescription()
: "default";
-			gen.writeStringField("restart-strategy", restartStrategyDescription);
-			gen.writeNumberField("job-parallelism", ec.getParallelism());
-			gen.writeBooleanField("object-reuse-mode", ec.isObjectReuseEnabled());
-
-			ExecutionConfig.GlobalJobParameters uc = ec.getGlobalJobParameters();
-			if (uc != null) {
-				Map<String, String> ucVals = uc.toMap();
-				if (ucVals != null) {
-					gen.writeObjectFieldStart("user-config");
-					
-					for (Map.Entry<String, String> ucVal : ucVals.entrySet()) {
-						gen.writeStringField(ucVal.getKey(), ucVal.getValue());
-					}
-
-					gen.writeEndObject();
+
+			gen.writeStringField("execution-mode", summary.getExecutionMode());
+
+			gen.writeStringField("restart-strategy", summary.getRestartStrategyDescription());
+			gen.writeNumberField("job-parallelism", summary.getParallelism());
+			gen.writeBooleanField("object-reuse-mode", summary.getObjectReuseEnabled());
+
+			Map<String, String> ucVals = summary.getGlobalJobParameters();
+			if (ucVals != null) {
+				gen.writeObjectFieldStart("user-config");
+
+				for (Map.Entry<String, String> ucVal : ucVals.entrySet()) {
+					gen.writeStringField(ucVal.getKey(), ucVal.getValue());
 				}
+
+				gen.writeEndObject();
 			}
-			
+
 			gen.writeEndObject();
 		}
 		gen.writeEndObject();

http://git-wip-us.apache.org/repos/asf/flink/blob/cd43dd59/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
index 4e916eb..7e7ed9e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -54,7 +55,6 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.URL;
-import java.net.URLClassLoader;
 import java.util.List;
 import java.util.concurrent.TimeoutException;
 
@@ -228,7 +228,7 @@ public class JobClient {
 				allURLs[pos++] = url;
 			}
 
-			return new URLClassLoader(allURLs, JobClient.class.getClassLoader());
+			return new FlinkUserCodeClassLoader(allURLs, JobClient.class.getClassLoader());
 		} else if (jmAnswer instanceof JobManagerMessages.JobNotFound) {
 			throw new JobRetrievalException(jobID, "Couldn't retrieve class loader. Job " + jobID
+ " not found");
 		} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/cd43dd59/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
index d1fbc70..21c6b4d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.execution.librarycache;
 import java.io.File;
 import java.io.IOException;
 import java.net.URL;
-import java.net.URLClassLoader;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -138,8 +138,7 @@ public final class BlobLibraryCacheManager extends TimerTask implements
LibraryC
 					count++;
 				}
 
-				URLClassLoader classLoader = new FlinkUserCodeClassLoader(urls);
-				cacheEntries.put(jobId, new LibraryCacheEntry(requiredJarFiles, classLoader, task));
+				cacheEntries.put(jobId, new LibraryCacheEntry(requiredJarFiles, urls, task));
 			}
 			else {
 				entry.register(task, requiredJarFiles);
@@ -156,14 +155,14 @@ public final class BlobLibraryCacheManager extends TimerTask implements
LibraryC
 	public void unregisterTask(JobID jobId, ExecutionAttemptID task) {
 		Preconditions.checkNotNull(jobId, "The JobId must not be null.");
 		Preconditions.checkNotNull(task, "The task execution id must not be null.");
-		
+
 		synchronized (lockObject) {
-			LibraryCacheEntry entry = cacheEntries.get(jobId);
-			
+			LibraryCacheEntry entry = cacheEntries.remove(jobId);
+
 			if (entry != null) {
 				if (entry.unregister(task)) {
-					cacheEntries.remove(jobId);
-					
+					entry.releaseClassLoader();
+
 					for (BlobKey key : entry.getLibraries()) {
 						unregisterReferenceToBlobKey(key);
 					}
@@ -286,17 +285,17 @@ public final class BlobLibraryCacheManager extends TimerTask implements
LibraryC
 	 */
 	private static class LibraryCacheEntry {
 		
-		private final ClassLoader classLoader;
+		private final FlinkUserCodeClassLoader classLoader;
 		
 		private final Set<ExecutionAttemptID> referenceHolders;
 		
 		private final Set<BlobKey> libraries;
 		
 		
-		public LibraryCacheEntry(Collection<BlobKey> libraries, ClassLoader classLoader,
ExecutionAttemptID initialReference) {
-			this.classLoader = classLoader;
-			this.libraries = new HashSet<BlobKey>(libraries);
-			this.referenceHolders = new HashSet<ExecutionAttemptID>();
+		public LibraryCacheEntry(Collection<BlobKey> libraries, URL[] libraryURLs, ExecutionAttemptID
initialReference) {
+			this.classLoader = new FlinkUserCodeClassLoader(libraryURLs);
+			this.libraries = new HashSet<>(libraries);
+			this.referenceHolders = new HashSet<>();
 			this.referenceHolders.add(initialReference);
 		}
 		
@@ -326,15 +325,18 @@ public final class BlobLibraryCacheManager extends TimerTask implements
LibraryC
 		public int getNumberOfReferenceHolders() {
 			return referenceHolders.size();
 		}
-	}
-
-	/**
-	 * Give the URLClassLoader a nicer name for debugging purposes.
-	 */
-	private static class FlinkUserCodeClassLoader extends URLClassLoader {
 
-		public FlinkUserCodeClassLoader(URL[] urls) {
-			super(urls, FlinkUserCodeClassLoader.class.getClassLoader());
+		/**
+		 * Release the class loader to ensure any file descriptors are closed
+		 * and the cached libraries are deleted immediately.
+		 */
+		void releaseClassLoader() {
+			try {
+				classLoader.close();
+			} catch (IOException e) {
+				LOG.warn("Failed to release user code class loader for " + Arrays.toString(libraries.toArray()));
+			}
 		}
 	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cd43dd59/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoader.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoader.java
new file mode 100644
index 0000000..015f6c7
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoader.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.execution.librarycache;
+
+import java.net.URL;
+import java.net.URLClassLoader;
+
+/**
+ * Gives the URLClassLoader a nicer name for debugging purposes.
+ */
+public class FlinkUserCodeClassLoader extends URLClassLoader {
+
+	public FlinkUserCodeClassLoader(URL[] urls) {
+		this(urls, FlinkUserCodeClassLoader.class.getClassLoader());
+	}
+
+	public FlinkUserCodeClassLoader(URL[] urls, ClassLoader parent) {
+		super(urls, parent);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd43dd59/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 585e9f3..c3cf297 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -39,6 +39,8 @@ import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
 import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
+import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader;
+import org.apache.flink.runtime.executiongraph.archive.ExecutionConfigSummary;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -222,6 +224,9 @@ public class ExecutionGraph {
 	// ------ Fields that are only relevant for archived execution graphs ------------
 	private String jsonPlan;
 
+	/** Serializable summary of all job config values, e.g. for web interface */
+	private ExecutionConfigSummary executionConfigSummary;
+
 	// --------------------------------------------------------------------------------------------
 	//   Constructors
 	// --------------------------------------------------------------------------------------------
@@ -301,6 +306,16 @@ public class ExecutionGraph {
 		metricGroup.gauge(RESTARTING_TIME_METRIC_NAME, new RestartTimeGauge());
 
 		this.kvStateLocationRegistry = new KvStateLocationRegistry(jobId, getAllVertices());
+
+		// create a summary of all relevant data accessed in the web interface's JobConfigHandler
+		try {
+			ExecutionConfig executionConfig = serializedConfig.deserializeValue(userClassLoader);
+			if (executionConfig != null) {
+				this.executionConfigSummary = new ExecutionConfigSummary(executionConfig);
+			}
+		} catch (IOException | ClassNotFoundException e) {
+			LOG.error("Couldn't create ExecutionConfigSummary for job {} ", jobID, e);
+		}
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -933,10 +948,29 @@ public class ExecutionGraph {
 		jobStatusListeners.clear();
 		executionListeners.clear();
 
+		if (userClassLoader instanceof FlinkUserCodeClassLoader) {
+			try {
+				// close the classloader to free space of user jars immediately
+				// otherwise we have to wait until garbage collection
+				((FlinkUserCodeClassLoader) userClassLoader).close();
+			} catch (IOException e) {
+				LOG.warn("Failed to close the user classloader for job {}", jobID, e);
+			}
+		}
+		userClassLoader = null;
+
 		isArchived = true;
 	}
 
 	/**
+	 * Returns the serializable ExecutionConfigSummary
+	 * @return ExecutionConfigSummary which may be null in case of errors
+	 */
+	public ExecutionConfigSummary getExecutionConfigSummary() {
+		return executionConfigSummary;
+	}
+
+	/**
 	 * Returns the serialized {@link ExecutionConfig}.
 	 *
 	 * @return ExecutionConfig

http://git-wip-us.apache.org/repos/asf/flink/blob/cd43dd59/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/archive/ExecutionConfigSummary.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/archive/ExecutionConfigSummary.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/archive/ExecutionConfigSummary.java
new file mode 100644
index 0000000..ad4677f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/archive/ExecutionConfigSummary.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph.archive;
+
+import org.apache.flink.api.common.ExecutionConfig;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * Serializable class which is created when archiving the job.
+ * It can be used to display job information on the web interface
+ * without having to keep the classloader around after job completion.
+ */
+public class ExecutionConfigSummary implements Serializable {
+
+	private final String executionMode;
+	private final String restartStrategyDescription;
+	private final int parallelism;
+	private final boolean objectReuseEnabled;
+	private final Map<String, String> globalJobParameters;
+
+	public ExecutionConfigSummary(ExecutionConfig ec) {
+		executionMode = ec.getExecutionMode().name();
+		if (ec.getRestartStrategy() != null) {
+			restartStrategyDescription = ec.getRestartStrategy().getDescription();
+		} else {
+			restartStrategyDescription = "default";
+		}
+		parallelism = ec.getParallelism();
+		objectReuseEnabled = ec.isObjectReuseEnabled();
+		if (ec.getGlobalJobParameters() != null
+				&& ec.getGlobalJobParameters().toMap() != null) {
+			globalJobParameters = ec.getGlobalJobParameters().toMap();
+		} else {
+			globalJobParameters = Collections.emptyMap();
+		}
+	}
+
+	public String getExecutionMode() {
+		return executionMode;
+	}
+
+	public String getRestartStrategyDescription() {
+		return restartStrategyDescription;
+	}
+
+	public int getParallelism() {
+		return parallelism;
+	}
+
+	public boolean getObjectReuseEnabled() {
+		return objectReuseEnabled;
+	}
+
+	public Map<String, String> getGlobalJobParameters() {
+		return globalJobParameters;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd43dd59/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 9994b7d..f7634cc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -476,6 +476,7 @@ public class Task implements Runnable {
 		Map<String, Future<Path>> distributedCacheEntries = new HashMap<String,
Future<Path>>();
 		AbstractInvokable invokable = null;
 
+		ClassLoader userCodeClassLoader = null;
 		try {
 			// ----------------------------
 			//  Task Bootstrap - We periodically
@@ -486,7 +487,7 @@ public class Task implements Runnable {
 			// this may involve downloading the job's JAR files and/or classes
 			LOG.info("Loading JAR files for task " + taskNameWithSubtask);
 
-			final ClassLoader userCodeClassLoader = createUserCodeClassloader(libraryCache);
+			userCodeClassLoader = createUserCodeClassloader(libraryCache);
 			final ExecutionConfig executionConfig = serializedExecutionConfig.deserializeValue(userCodeClassLoader);
 
 			if (executionConfig.getTaskCancellationInterval() >= 0) {

http://git-wip-us.apache.org/repos/asf/flink/blob/cd43dd59/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
index a8482ac..3b0c364 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
@@ -250,7 +250,7 @@ public class WebFrontendITCase extends TestLogger {
 			assertEquals(response.getType(), MimeTypes.getMimeTypeForExtension("json"));
 			assertEquals("{\"jid\":\""+jid+"\",\"name\":\"Stoppable streaming test job\"," +
 					"\"execution-config\":{\"execution-mode\":\"PIPELINED\",\"restart-strategy\":\"default\","
+
-					"\"job-parallelism\":-1,\"object-reuse-mode\":false}}", response.getContent());
+					"\"job-parallelism\":-1,\"object-reuse-mode\":false,\"user-config\":{}}}", response.getContent());
 		}
 	}
 


Mime
View raw message