flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [3/5] flink git commit: [FLINK-7695] [flip6] Add JobConfigHandler for new RestServerEndpoint
Date Mon, 02 Oct 2017 20:29:44 GMT
[FLINK-7695] [flip6] Add JobConfigHandler for new RestServerEndpoint

This closes #4737.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/172a64c1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/172a64c1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/172a64c1

Branch: refs/heads/master
Commit: 172a64c1488bd6edda97473562c6871ae7f3364d
Parents: aae417f
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Tue Sep 26 18:39:15 2017 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Mon Oct 2 19:58:28 2017 +0200

----------------------------------------------------------------------
 .../dispatcher/DispatcherRestEndpoint.java      |  20 ++
 .../executiongraph/AccessExecutionGraph.java    |   2 +
 .../executiongraph/ArchivedExecutionGraph.java  |  24 +-
 .../runtime/executiongraph/ExecutionGraph.java  |   3 +-
 .../rest/handler/AbstractRestHandler.java       |   2 +-
 .../runtime/rest/handler/RedirectHandler.java   |   8 +-
 .../job/AbstractExecutionGraphHandler.java      |  78 +++++++
 .../rest/handler/job/JobConfigHandler.java      |  75 ++++++
 .../AbstractExecutionGraphRequestHandler.java   |   2 +-
 .../rest/handler/legacy/JobConfigHandler.java   |  17 +-
 .../runtime/rest/messages/JobConfigHeaders.java |  70 ++++++
 .../runtime/rest/messages/JobConfigInfo.java    | 229 +++++++++++++++++++
 .../rest/messages/JobMessageParameters.java     |  40 ++++
 .../ArchivedExecutionGraphTest.java             |   7 +-
 .../legacy/messages/JobConfigInfoTest.java      |  52 +++++
 15 files changed, 601 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/172a64c1/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index c092ea0..150490d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -27,11 +27,13 @@ import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.LegacyRestHandlerAdapter;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.handler.job.JobConfigHandler;
 import org.apache.flink.runtime.rest.handler.job.JobTerminationHandler;
 import org.apache.flink.runtime.rest.handler.legacy.ClusterConfigHandler;
 import org.apache.flink.runtime.rest.handler.legacy.ClusterOverviewHandler;
 import org.apache.flink.runtime.rest.handler.legacy.CurrentJobsOverviewHandler;
 import org.apache.flink.runtime.rest.handler.legacy.DashboardConfigHandler;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
 import org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpecification;
 import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterConfigurationInfo;
@@ -42,6 +44,7 @@ import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
 import org.apache.flink.runtime.rest.messages.CurrentJobsOverviewHandlerHeaders;
 import org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.JobConfigHeaders;
 import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
@@ -68,6 +71,8 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 	private final RestHandlerConfiguration restConfiguration;
 	private final Executor executor;
 
+	private final ExecutionGraphCache executionGraphCache;
+
 	public DispatcherRestEndpoint(
 			RestServerEndpointConfiguration endpointConfiguration,
 			GatewayRetriever<DispatcherGateway> leaderRetriever,
@@ -79,6 +84,10 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 		this.clusterConfiguration = Preconditions.checkNotNull(clusterConfiguration);
 		this.restConfiguration = Preconditions.checkNotNull(restConfiguration);
 		this.executor = Preconditions.checkNotNull(executor);
+
+		this.executionGraphCache = new ExecutionGraphCache(
+			restConfiguration.getTimeout(),
+			Time.milliseconds(restConfiguration.getRefreshInterval()));
 	}
 
 	@Override
@@ -131,6 +140,14 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 			timeout,
 			JobTerminationHeaders.getInstance());
 
+		JobConfigHandler jobConfigHandler = new JobConfigHandler(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			JobConfigHeaders.getInstance(),
+			executionGraphCache,
+			executor);
+
 		final File tmpDir = restConfiguration.getTmpDir();
 
 		Optional<StaticFileServerHandler<DispatcherGateway>> optWebContent;
@@ -151,6 +168,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 		handlers.add(Tuple2.of(DashboardConfigurationHeaders.getInstance(), dashboardConfigurationHandler));
 		handlers.add(Tuple2.of(CurrentJobsOverviewHandlerHeaders.getInstance(), currentJobsOverviewHandler));
 		handlers.add(Tuple2.of(JobTerminationHeaders.getInstance(), jobTerminationHandler));
+		handlers.add(Tuple2.of(JobConfigHeaders.getInstance(), jobConfigHandler));
 
 		// This handler MUST be added last, as it otherwise masks all subsequent GET handlers
 		optWebContent.ifPresent(
@@ -163,6 +181,8 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 	public void shutdown(Time timeout) {
 		super.shutdown(timeout);
 
+		executionGraphCache.close();
+
 		final File tmpDir = restConfiguration.getTmpDir();
 
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/172a64c1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
index cab3c92..ebc0768 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
@@ -69,6 +69,7 @@ public interface AccessExecutionGraph {
 	 *
 	 * @return failure causing exception, or null
 	 */
+	@Nullable
 	ErrorInfo getFailureCause();
 
 	/**
@@ -131,6 +132,7 @@ public interface AccessExecutionGraph {
 	 *
 	 * @return execution config summary for this execution graph, or null in case of errors
 	 */
+	@Nullable
 	ArchivedExecutionConfig getArchivedExecutionConfig();
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/172a64c1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
index 7f857f9..4481e1b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 
 import javax.annotation.Nullable;
@@ -71,6 +72,7 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl
 	 * The exception that caused the job to fail. This is set to the first root exception
 	 * that was not recoverable and triggered job failure
 	 */
+	@Nullable
 	private final ErrorInfo failureCause;
 
 	// ------ Fields that are only relevant for archived execution graphs ------------
@@ -93,7 +95,7 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl
 			List<ArchivedExecutionJobVertex> verticesInCreationOrder,
 			long[] stateTimestamps,
 			JobStatus state,
-			ErrorInfo failureCause,
+			@Nullable ErrorInfo failureCause,
 			String jsonPlan,
 			StringifiedAccumulatorResult[] archivedUserAccumulators,
 			Map<String, SerializedValue<Object>> serializedUserAccumulators,
@@ -102,17 +104,17 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph,
Serializabl
 			@Nullable CheckpointCoordinatorConfiguration jobCheckpointingConfiguration,
 			@Nullable CheckpointStatsSnapshot checkpointStatsSnapshot) {
 
-		this.jobID = jobID;
-		this.jobName = jobName;
-		this.tasks = tasks;
-		this.verticesInCreationOrder = verticesInCreationOrder;
-		this.stateTimestamps = stateTimestamps;
-		this.state = state;
+		this.jobID = Preconditions.checkNotNull(jobID);
+		this.jobName = Preconditions.checkNotNull(jobName);
+		this.tasks = Preconditions.checkNotNull(tasks);
+		this.verticesInCreationOrder = Preconditions.checkNotNull(verticesInCreationOrder);
+		this.stateTimestamps = Preconditions.checkNotNull(stateTimestamps);
+		this.state = Preconditions.checkNotNull(state);
 		this.failureCause = failureCause;
-		this.jsonPlan = jsonPlan;
-		this.archivedUserAccumulators = archivedUserAccumulators;
-		this.serializedUserAccumulators = serializedUserAccumulators;
-		this.archivedExecutionConfig = executionConfig;
+		this.jsonPlan = Preconditions.checkNotNull(jsonPlan);
+		this.archivedUserAccumulators = Preconditions.checkNotNull(archivedUserAccumulators);
+		this.serializedUserAccumulators = Preconditions.checkNotNull(serializedUserAccumulators);
+		this.archivedExecutionConfig = Preconditions.checkNotNull(executionConfig);
 		this.isStoppable = isStoppable;
 		this.jobCheckpointingConfiguration = jobCheckpointingConfiguration;
 		this.checkpointStatsSnapshot = checkpointStatsSnapshot;

http://git-wip-us.apache.org/repos/asf/flink/blob/172a64c1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 56733ba..3130ddd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -1223,7 +1223,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	}
 
 	/**
-	 * Returns the serializable ArchivedExecutionConfig
+	 * Returns the serializable {@link ArchivedExecutionConfig}.
+	 *
 	 * @return ArchivedExecutionConfig which may be null in case of errors
 	 */
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/172a64c1/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
index 3469e6f..81ed9cf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
@@ -68,7 +68,7 @@ public abstract class AbstractRestHandler<T extends RestfulGateway, R
extends Re
 
 	protected AbstractRestHandler(
 			CompletableFuture<String> localRestAddress,
-			GatewayRetriever<T> leaderRetriever,
+			GatewayRetriever<? extends T> leaderRetriever,
 			Time timeout,
 			MessageHeaders<R, P, M> messageHeaders) {
 		super(localRestAddress, leaderRetriever, timeout);

http://git-wip-us.apache.org/repos/asf/flink/blob/172a64c1/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RedirectHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RedirectHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RedirectHandler.java
index dfede98..d1feabb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RedirectHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RedirectHandler.java
@@ -58,7 +58,7 @@ public abstract class RedirectHandler<T extends RestfulGateway> extends
SimpleCh
 
 	protected final CompletableFuture<String> localAddressFuture;
 
-	protected final GatewayRetriever<T> leaderRetriever;
+	protected final GatewayRetriever<? extends T> leaderRetriever;
 
 	protected final Time timeout;
 
@@ -66,7 +66,7 @@ public abstract class RedirectHandler<T extends RestfulGateway> extends
SimpleCh
 
 	protected RedirectHandler(
 			@Nonnull CompletableFuture<String> localAddressFuture,
-			@Nonnull GatewayRetriever<T> leaderRetriever,
+			@Nonnull GatewayRetriever<? extends T> leaderRetriever,
 			@Nonnull Time timeout) {
 		this.localAddressFuture = Preconditions.checkNotNull(localAddressFuture);
 		this.leaderRetriever = Preconditions.checkNotNull(leaderRetriever);
@@ -97,10 +97,10 @@ public abstract class RedirectHandler<T extends RestfulGateway>
extends SimpleCh
 			}
 
 			try {
-				OptionalConsumer<T> optLeaderConsumer = OptionalConsumer.of(leaderRetriever.getNow());
+				OptionalConsumer<? extends T> optLeaderConsumer = OptionalConsumer.of(leaderRetriever.getNow());
 
 				optLeaderConsumer.ifPresent(
-					(T gateway) -> {
+					gateway -> {
 						CompletableFuture<Optional<String>> optRedirectAddressFuture = HandlerRedirectUtils.getRedirectAddress(
 							localAddress,
 							gateway,

http://git-wip-us.apache.org/repos/asf/flink/blob/172a64c1/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
new file mode 100644
index 0000000..efe3758
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Base class for all {@link AccessExecutionGraph} based REST handlers.
+ *
+ * @param <R> response type
+ */
+public abstract class AbstractExecutionGraphHandler<R extends ResponseBody> extends
AbstractRestHandler<RestfulGateway, EmptyRequestBody, R, JobMessageParameters> {
+
+	private final ExecutionGraphCache executionGraphCache;
+
+	private final Executor executor;
+
+	protected AbstractExecutionGraphHandler(
+			CompletableFuture<String> localRestAddress,
+			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+			Time timeout,
+			MessageHeaders<EmptyRequestBody, R, JobMessageParameters> messageHeaders,
+			ExecutionGraphCache executionGraphCache,
+			Executor executor) {
+		super(localRestAddress, leaderRetriever, timeout, messageHeaders);
+
+		this.executionGraphCache = Preconditions.checkNotNull(executionGraphCache);
+		this.executor = Preconditions.checkNotNull(executor);
+	}
+
+	@Override
+	protected CompletableFuture<R> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody,
JobMessageParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException
{
+		JobID jobId = request.getPathParameter(JobIDPathParameter.class);
+
+		CompletableFuture<AccessExecutionGraph> executionGraphFuture = executionGraphCache.getExecutionGraph(jobId,
gateway);
+
+		return executionGraphFuture.thenApplyAsync(
+			this::handleRequest,
+			executor);
+	}
+
+	protected abstract R handleRequest(AccessExecutionGraph executionGraph);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/172a64c1/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java
new file mode 100644
index 0000000..bbe4eef
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.ArchivedExecutionConfig;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobConfigInfo;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Handler serving the job configuration.
+ */
+public class JobConfigHandler extends AbstractExecutionGraphHandler<JobConfigInfo>
{
+
+	public JobConfigHandler(
+			CompletableFuture<String> localRestAddress,
+			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+			Time timeout,
+			MessageHeaders<EmptyRequestBody, JobConfigInfo, JobMessageParameters> messageHeaders,
+			ExecutionGraphCache executionGraphCache,
+			Executor executor) {
+
+		super(
+			localRestAddress,
+			leaderRetriever,
+			timeout,
+			messageHeaders,
+			executionGraphCache,
+			executor);
+	}
+
+	@Override
+	protected JobConfigInfo handleRequest(AccessExecutionGraph executionGraph) {
+		final ArchivedExecutionConfig executionConfig = executionGraph.getArchivedExecutionConfig();
+		final JobConfigInfo.ExecutionConfigInfo executionConfigInfo;
+
+		if (executionConfig != null) {
+			executionConfigInfo = new JobConfigInfo.ExecutionConfigInfo(
+				executionConfig.getExecutionMode(),
+				executionConfig.getRestartStrategyDescription(),
+				executionConfig.getParallelism(),
+				executionConfig.getObjectReuseEnabled(),
+				executionConfig.getGlobalJobParameters());
+		} else {
+			executionConfigInfo = null;
+		}
+
+		return new JobConfigInfo(executionGraph.getJobID(), executionGraph.getJobName(), executionConfigInfo);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/172a64c1/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractExecutionGraphRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractExecutionGraphRequestHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractExecutionGraphRequestHandler.java
index 7fbde15..b98508a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractExecutionGraphRequestHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractExecutionGraphRequestHandler.java
@@ -70,7 +70,7 @@ public abstract class AbstractExecutionGraphRequestHandler extends AbstractJsonR
 					throw new CompletionException(new NotFoundException("Could not find job " + jid + '.'));
 				})
 			.thenComposeAsync(
-				(AccessExecutionGraph executionGraph) -> handleRequest(executionGraph, pathParams));
+				(AccessExecutionGraph executionGraph) -> handleRequest(executionGraph, pathParams),
executor);
 	}
 
 	public abstract CompletableFuture<String> handleRequest(AccessExecutionGraph graph,
Map<String, String> params);

http://git-wip-us.apache.org/repos/asf/flink/blob/172a64c1/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandler.java
index 2d40496..a05dbb8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandler.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.rest.handler.legacy;
 
 import org.apache.flink.api.common.ArchivedExecutionConfig;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.rest.messages.JobConfigInfo;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.util.FlinkException;
@@ -84,23 +85,23 @@ public class JobConfigHandler extends AbstractExecutionGraphRequestHandler
{
 		JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
 
 		gen.writeStartObject();
-		gen.writeStringField("jid", graph.getJobID().toString());
-		gen.writeStringField("name", graph.getJobName());
+		gen.writeStringField(JobConfigInfo.FIELD_NAME_JOB_ID, graph.getJobID().toString());
+		gen.writeStringField(JobConfigInfo.FIELD_NAME_JOB_NAME, graph.getJobName());
 
 		final ArchivedExecutionConfig summary = graph.getArchivedExecutionConfig();
 
 		if (summary != null) {
-			gen.writeObjectFieldStart("execution-config");
+			gen.writeObjectFieldStart(JobConfigInfo.FIELD_NAME_EXECUTION_CONFIG);
 
-			gen.writeStringField("execution-mode", summary.getExecutionMode());
+			gen.writeStringField(JobConfigInfo.ExecutionConfigInfo.FIELD_NAME_EXECUTION_MODE, summary.getExecutionMode());
 
-			gen.writeStringField("restart-strategy", summary.getRestartStrategyDescription());
-			gen.writeNumberField("job-parallelism", summary.getParallelism());
-			gen.writeBooleanField("object-reuse-mode", summary.getObjectReuseEnabled());
+			gen.writeStringField(JobConfigInfo.ExecutionConfigInfo.FIELD_NAME_RESTART_STRATEGY, summary.getRestartStrategyDescription());
+			gen.writeNumberField(JobConfigInfo.ExecutionConfigInfo.FIELD_NAME_PARALLELISM, summary.getParallelism());
+			gen.writeBooleanField(JobConfigInfo.ExecutionConfigInfo.FIELD_NAME_OBJECT_REUSE_MODE,
summary.getObjectReuseEnabled());
 
 			Map<String, String> ucVals = summary.getGlobalJobParameters();
 			if (ucVals != null) {
-				gen.writeObjectFieldStart("user-config");
+				gen.writeObjectFieldStart(JobConfigInfo.ExecutionConfigInfo.FIELD_NAME_GLOBAL_JOB_PARAMETERS);
 
 				for (Map.Entry<String, String> ucVal : ucVals.entrySet()) {
 					gen.writeStringField(ucVal.getKey(), ucVal.getValue());

http://git-wip-us.apache.org/repos/asf/flink/blob/172a64c1/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobConfigHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobConfigHeaders.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobConfigHeaders.java
new file mode 100644
index 0000000..fba4f20
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobConfigHeaders.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.job.JobConfigHandler;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link JobConfigHandler}.
+ */
+public class JobConfigHeaders implements MessageHeaders<EmptyRequestBody, JobConfigInfo,
JobMessageParameters> {
+
+	private static final JobConfigHeaders INSTANCE = new JobConfigHeaders();
+
+	public static final String URL = "/jobs/:jobid/config";
+
+	private JobConfigHeaders() {}
+
+	@Override
+	public Class<EmptyRequestBody> getRequestClass() {
+		return EmptyRequestBody.class;
+	}
+
+	@Override
+	public Class<JobConfigInfo> getResponseClass() {
+		return JobConfigInfo.class;
+	}
+
+	@Override
+	public HttpResponseStatus getResponseStatusCode() {
+		return HttpResponseStatus.OK;
+	}
+
+	@Override
+	public JobMessageParameters getUnresolvedMessageParameters() {
+		return new JobMessageParameters();
+	}
+
+	@Override
+	public HttpMethodWrapper getHttpMethod() {
+		return HttpMethodWrapper.GET;
+	}
+
+	@Override
+	public String getTargetRestEndpointURL() {
+		return URL;
+	}
+
+	public static JobConfigHeaders getInstance() {
+		return INSTANCE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/172a64c1/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobConfigInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobConfigInfo.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobConfigInfo.java
new file mode 100644
index 0000000..3b0ac2d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobConfigInfo.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.rest.handler.job.JobConfigHandler;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Response type of the {@link JobConfigHandler}.
+ */
+@JsonSerialize(using = JobConfigInfo.Serializer.class)
+@JsonDeserialize(using = JobConfigInfo.Deserializer.class)
+public class JobConfigInfo implements ResponseBody {
+
+	public static final String FIELD_NAME_JOB_ID = "jid";
+	public static final String FIELD_NAME_JOB_NAME = "name";
+	public static final String FIELD_NAME_EXECUTION_CONFIG = "execution-config";
+
+	private final JobID jobId;
+
+	private final String jobName;
+
+	@Nullable
+	private final ExecutionConfigInfo executionConfigInfo;
+
+	public JobConfigInfo(
+			JobID jobId,
+			String jobName,
+			@Nullable ExecutionConfigInfo executionConfigInfo) {
+		this.jobId = Preconditions.checkNotNull(jobId);
+		this.jobName = Preconditions.checkNotNull(jobName);
+		this.executionConfigInfo = executionConfigInfo;
+	}
+
+	public JobID getJobId() {
+		return jobId;
+	}
+
+	public String getJobName() {
+		return jobName;
+	}
+
+	@Nullable
+	public ExecutionConfigInfo getExecutionConfigInfo() {
+		return executionConfigInfo;
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		JobConfigInfo that = (JobConfigInfo) o;
+		return Objects.equals(jobId, that.jobId) &&
+			Objects.equals(jobName, that.jobName) &&
+			Objects.equals(executionConfigInfo, that.executionConfigInfo);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(jobId, jobName, executionConfigInfo);
+	}
+
+	//---------------------------------------------------------------------------------
+	// Static helper classes
+	//---------------------------------------------------------------------------------
+
+	/**
+	 * Json serializer for the {@link JobConfigInfo}.
+	 */
+	public static final class Serializer extends StdSerializer<JobConfigInfo> {
+
+		private static final long serialVersionUID = -1551666039618928811L;
+
+		public Serializer() {
+			super(JobConfigInfo.class);
+		}
+
+		@Override
+		public void serialize(
+				JobConfigInfo jobConfigInfo,
+				JsonGenerator jsonGenerator,
+				SerializerProvider serializerProvider) throws IOException {
+			jsonGenerator.writeStartObject();
+
+			jsonGenerator.writeStringField(FIELD_NAME_JOB_ID, jobConfigInfo.getJobId().toString());
+			jsonGenerator.writeStringField(FIELD_NAME_JOB_NAME, jobConfigInfo.getJobName());
+
+			if (jobConfigInfo.getExecutionConfigInfo() != null) {
+				jsonGenerator.writeObjectField(FIELD_NAME_EXECUTION_CONFIG, jobConfigInfo.getExecutionConfigInfo());
+			}
+
+			jsonGenerator.writeEndObject();
+		}
+	}
+
+	/**
+	 * Json deserializer for the {@link JobConfigInfo}.
+	 */
+	public static final class Deserializer extends StdDeserializer<JobConfigInfo> {
+
+		private static final long serialVersionUID = -3580088509877177213L;
+
+		public Deserializer() {
+			super(JobConfigInfo.class);
+		}
+
+		@Override
+		public JobConfigInfo deserialize(
+				JsonParser jsonParser,
+				DeserializationContext deserializationContext) throws IOException {
+			JsonNode rootNode = jsonParser.readValueAsTree();
+
+			final JobID jobId = JobID.fromHexString(rootNode.get(FIELD_NAME_JOB_ID).asText());
+			final String jobName = rootNode.get(FIELD_NAME_JOB_NAME).asText();
+
+			final ExecutionConfigInfo executionConfigInfo;
+
+			if (rootNode.has(FIELD_NAME_EXECUTION_CONFIG)) {
+				executionConfigInfo = RestMapperUtils.getStrictObjectMapper().treeToValue(rootNode.get(FIELD_NAME_EXECUTION_CONFIG),
ExecutionConfigInfo.class);
+			} else {
+				executionConfigInfo = null;
+			}
+
+			return new JobConfigInfo(jobId, jobName, executionConfigInfo);
+		}
+	}
+
+	/**
+	 * Nested class to encapsulate the execution configuration.
+	 */
+	public static final class ExecutionConfigInfo {
+
+		public static final String FIELD_NAME_EXECUTION_MODE = "execution-mode";
+		public static final String FIELD_NAME_RESTART_STRATEGY = "restart-strategy";
+		public static final String FIELD_NAME_PARALLELISM = "job-parallelism";
+		public static final String FIELD_NAME_OBJECT_REUSE_MODE = "object-reuse-mode";
+		public static final String FIELD_NAME_GLOBAL_JOB_PARAMETERS = "user-config";
+
+		@JsonProperty(FIELD_NAME_EXECUTION_MODE)
+		private final String executionMode;
+
+		@JsonProperty(FIELD_NAME_RESTART_STRATEGY)
+		private final String restartStrategy;
+
+		@JsonProperty(FIELD_NAME_PARALLELISM)
+		private final int parallelism;
+
+		@JsonProperty(FIELD_NAME_OBJECT_REUSE_MODE)
+		private final boolean isObjectResuse;
+
+		@JsonProperty(FIELD_NAME_GLOBAL_JOB_PARAMETERS)
+		private final Map<String, String> globalJobParameters;
+
+		@JsonCreator
+		public ExecutionConfigInfo(
+				@JsonProperty(FIELD_NAME_EXECUTION_MODE) String executionMode,
+				@JsonProperty(FIELD_NAME_RESTART_STRATEGY) String restartStrategy,
+				@JsonProperty(FIELD_NAME_PARALLELISM) int parallelism,
+				@JsonProperty(FIELD_NAME_OBJECT_REUSE_MODE) boolean isObjectResuse,
+				@JsonProperty(FIELD_NAME_GLOBAL_JOB_PARAMETERS) Map<String, String> globalJobParameters)
{
+			this.executionMode = Preconditions.checkNotNull(executionMode);
+			this.restartStrategy = Preconditions.checkNotNull(restartStrategy);
+			this.parallelism = parallelism;
+			this.isObjectResuse = isObjectResuse;
+			this.globalJobParameters = Preconditions.checkNotNull(globalJobParameters);
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			ExecutionConfigInfo that = (ExecutionConfigInfo) o;
+			return parallelism == that.parallelism &&
+				isObjectResuse == that.isObjectResuse &&
+				Objects.equals(executionMode, that.executionMode) &&
+				Objects.equals(restartStrategy, that.restartStrategy) &&
+				Objects.equals(globalJobParameters, that.globalJobParameters);
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(executionMode, restartStrategy, parallelism, isObjectResuse, globalJobParameters);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/172a64c1/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobMessageParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobMessageParameters.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobMessageParameters.java
new file mode 100644
index 0000000..9d74c95
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobMessageParameters.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Message parameters which require a job path parameter.
+ */
+public class JobMessageParameters extends MessageParameters {
+
+	private final JobIDPathParameter jobPathParameter = new JobIDPathParameter();
+
+	@Override
+	public Collection<MessagePathParameter<?>> getPathParameters() {
+		return Collections.singleton(jobPathParameter);
+	}
+
+	@Override
+	public Collection<MessageQueryParameter<?>> getQueryParameters() {
+		return Collections.emptySet();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/172a64c1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
index 81d5df9..0d7c8e6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -41,10 +41,11 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -64,7 +65,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 
-public class ArchivedExecutionGraphTest {
+public class ArchivedExecutionGraphTest extends TestLogger {
 	private static JobVertexID v1ID = new JobVertexID();
 	private static JobVertexID v2ID = new JobVertexID();
 
@@ -142,6 +143,8 @@ public class ArchivedExecutionGraphTest {
 
 		Execution executionWithAccumulators = runtimeGraph.getJobVertex(v1ID).getTaskVertices()[0].getCurrentExecutionAttempt();
 
+		runtimeGraph.setJsonPlan("{}");
+
 		runtimeGraph.getJobVertex(v2ID).getTaskVertices()[0].getCurrentExecutionAttempt().fail(new
RuntimeException("This exception was thrown on purpose."));
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/172a64c1/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobConfigInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobConfigInfoTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobConfigInfoTest.java
new file mode 100644
index 0000000..2223d3d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobConfigInfoTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.messages;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.rest.messages.JobConfigInfo;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Tests that the {@link JobConfigInfo} can be marshalled and unmarshalled.
+ */
+public class JobConfigInfoTest extends RestResponseMarshallingTestBase<JobConfigInfo>
{
+
+	@Override
+	protected Class<JobConfigInfo> getTestResponseClass() {
+		return JobConfigInfo.class;
+	}
+
+	@Override
+	protected JobConfigInfo getTestResponseInstance() {
+		final Map<String, String> globalJobParameters = new HashMap<>(3);
+		globalJobParameters.put("foo", "bar");
+		globalJobParameters.put("bar", "foo");
+		globalJobParameters.put("hi", "ho");
+
+		final JobConfigInfo.ExecutionConfigInfo executionConfigInfo = new JobConfigInfo.ExecutionConfigInfo(
+			"foobar",
+			"always",
+			42,
+			false,
+			globalJobParameters);
+		return new JobConfigInfo(new JobID(), "testJob", executionConfigInfo);
+	}
+}


Mime
View raw message