flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [08/16] flink git commit: [FLINK-7531] Move Flink legacy rest handler to flink-runtime
Date Tue, 19 Sep 2017 22:44:18 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/WebHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/WebHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/WebHandler.java
new file mode 100644
index 0000000..58fda14
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/WebHandler.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+/**
+ * Marker interface for web handlers which can describe their paths.
+ */
+public interface WebHandler {
+
+	/**
+	 * Returns an array of REST URL's under which this handler can be registered.
+	 *
+	 * @return array containing REST URL's under which this handler can be registered.
+	 */
+	String[] getPaths();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractExecutionGraphRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractExecutionGraphRequestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractExecutionGraphRequestHandler.java
new file mode 100644
index 0000000..e214a36
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractExecutionGraphRequestHandler.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.rest.NotFoundException;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Base class for request handlers whose response depends on an ExecutionGraph
+ * that can be retrieved via "jobid" parameter.
+ */
+public abstract class AbstractExecutionGraphRequestHandler extends AbstractJsonRequestHandler {
+
+	private final ExecutionGraphHolder executionGraphHolder;
+
+	public AbstractExecutionGraphRequestHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+		super(executor);
+		this.executionGraphHolder = Preconditions.checkNotNull(executionGraphHolder);
+	}
+
+	@Override
+	public CompletableFuture<String> handleJsonRequest(
+			Map<String, String> pathParams,
+			Map<String, String> queryParams,
+			JobManagerGateway jobManagerGateway) {
+		String jidString = pathParams.get("jobid");
+		if (jidString == null) {
+			throw new RuntimeException("JobId parameter missing");
+		}
+
+		JobID jid;
+		try {
+			jid = JobID.fromHexString(jidString);
+		}
+		catch (Exception e) {
+			return FutureUtils.completedExceptionally(new FlinkException("Invalid JobID string '" + jidString + "'", e));
+		}
+
+		final CompletableFuture<Optional<AccessExecutionGraph>> graphFuture = executionGraphHolder.getExecutionGraph(jid, jobManagerGateway);
+
+		return graphFuture.thenComposeAsync(
+			(Optional<AccessExecutionGraph> optGraph) -> {
+				if (optGraph.isPresent()) {
+					return handleRequest(optGraph.get(), pathParams);
+				} else {
+					throw new FlinkFutureException(new NotFoundException("Could not find job with jobId " + jid + '.'));
+				}
+			}, executor);
+	}
+
+	public abstract CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractJobVertexRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractJobVertexRequestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractJobVertexRequestHandler.java
new file mode 100644
index 0000000..e2e4484
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractJobVertexRequestHandler.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Base class for request handlers whose response depends on a specific job vertex (defined
+ * via the "vertexid" parameter) in a specific job, defined via (defined voa the "jobid" parameter).
+ */
+public abstract class AbstractJobVertexRequestHandler extends AbstractExecutionGraphRequestHandler {
+
+	public AbstractJobVertexRequestHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+		super(executionGraphHolder, executor);
+	}
+
+	@Override
+	public final CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
+		final JobVertexID vid = parseJobVertexId(params);
+
+		final AccessExecutionJobVertex jobVertex = graph.getJobVertex(vid);
+		if (jobVertex == null) {
+			throw new IllegalArgumentException("No vertex with ID '" + vid + "' exists.");
+		}
+
+		return handleRequest(jobVertex, params);
+	}
+
+	/**
+	 * Returns the job vertex ID parsed from the provided parameters.
+	 *
+	 * @param params Path parameters
+	 * @return Parsed job vertex ID or <code>null</code> if not available.
+	 */
+	public static JobVertexID parseJobVertexId(Map<String, String> params) {
+		String jobVertexIdParam = params.get("vertexid");
+		if (jobVertexIdParam == null) {
+			return null;
+		}
+
+		try {
+			return JobVertexID.fromHexString(jobVertexIdParam);
+		} catch (RuntimeException ignored) {
+			return null;
+		}
+	}
+
+	public abstract CompletableFuture<String> handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractJsonRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractJsonRequestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractJsonRequestHandler.java
new file mode 100644
index 0000000..43c4af3
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractJsonRequestHandler.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.rest.NotFoundException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
+
+import java.nio.charset.Charset;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Base class for most request handlers. The handlers must produce a JSON response.
+ */
+public abstract class AbstractJsonRequestHandler implements RequestHandler {
+
+	private static final Charset ENCODING = Charset.forName("UTF-8");
+
+	protected final Executor executor;
+
+	protected AbstractJsonRequestHandler(Executor executor) {
+		this.executor = Preconditions.checkNotNull(executor);
+	}
+
+	@Override
+	public CompletableFuture<FullHttpResponse> handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+		CompletableFuture<String> resultFuture = handleJsonRequest(pathParams, queryParams, jobManagerGateway);
+
+		return resultFuture.thenApplyAsync(
+			(String result) -> {
+				byte[] bytes = result.getBytes(ENCODING);
+
+				DefaultFullHttpResponse response = new DefaultFullHttpResponse(
+					HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(bytes));
+
+				response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/json; charset=" + ENCODING.name());
+				response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
+
+				return response;
+			});
+	}
+
+	/**
+	 * Core method that handles the request and generates the response. The method needs to
+	 * respond with a valid JSON string. Exceptions may be thrown and will be handled.
+	 *
+	 * @param pathParams The map of REST path parameters, decoded by the router.
+	 * @param queryParams The map of query parameters.
+	 * @param jobManagerGateway to communicate with the JobManager.
+	 *
+	 * @return The JSON string that is the HTTP response.
+	 *
+	 * @throws Exception Handlers may forward exceptions. Exceptions of type
+	 *         {@link NotFoundException} will cause a HTTP 404
+	 *         response with the exception message, other exceptions will cause a HTTP 500 response
+	 *         with the exception stack trace.
+	 */
+	public abstract CompletableFuture<String> handleJsonRequest(
+			Map<String, String> pathParams,
+			Map<String, String> queryParams,
+			JobManagerGateway jobManagerGateway);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractSubtaskAttemptRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractSubtaskAttemptRequestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractSubtaskAttemptRequestHandler.java
new file mode 100644
index 0000000..ec277d8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractSubtaskAttemptRequestHandler.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.util.FlinkException;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Base class for request handlers whose response depends on a specific subtask execution attempt
+ * (defined via the "attempt" parameter) of a specific subtask (defined via the
+ * "subtasknum" parameter) in a specific job vertex (defined via the "vertexid" parameter) in a
+ * specific job, defined via (defined voa the "jobid" parameter).
+ */
+public abstract class AbstractSubtaskAttemptRequestHandler extends AbstractSubtaskRequestHandler {
+
+	public AbstractSubtaskAttemptRequestHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+		super(executionGraphHolder, executor);
+	}
+
+	@Override
+	public CompletableFuture<String> handleRequest(AccessExecutionVertex vertex, Map<String, String> params) {
+		final String attemptNumberString = params.get("attempt");
+		if (attemptNumberString == null) {
+			return FutureUtils.completedExceptionally(new FlinkException("Attempt number parameter missing"));
+		}
+
+		final int attempt;
+		try {
+			attempt = Integer.parseInt(attemptNumberString);
+		}
+		catch (NumberFormatException e) {
+			return FutureUtils.completedExceptionally(new FlinkException("Invalid attempt number parameter"));
+		}
+
+		final AccessExecution currentAttempt = vertex.getCurrentExecutionAttempt();
+		if (attempt == currentAttempt.getAttemptNumber()) {
+			return handleRequest(currentAttempt, params);
+		}
+		else if (attempt >= 0 && attempt < currentAttempt.getAttemptNumber()) {
+			AccessExecution exec = vertex.getPriorExecutionAttempt(attempt);
+
+			if (exec != null) {
+				return handleRequest(exec, params);
+			} else {
+				return FutureUtils.completedExceptionally(new RequestHandlerException("Execution for attempt " + attempt +
+					" has already been deleted."));
+			}
+		}
+		else {
+			return FutureUtils.completedExceptionally(new FlinkException("Attempt does not exist: " + attempt));
+		}
+	}
+
+	public abstract CompletableFuture<String> handleRequest(AccessExecution execAttempt, Map<String, String> params);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractSubtaskRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractSubtaskRequestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractSubtaskRequestHandler.java
new file mode 100644
index 0000000..d69038a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractSubtaskRequestHandler.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.util.FlinkException;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Base class for request handlers whose response depends on a specific subtask (defined via the
+ * "subtasknum" parameter) in a specific job vertex (defined via the "vertexid" parameter) in a
+ * specific job, defined via (defined voa the "jobid" parameter).
+ */
+public abstract class AbstractSubtaskRequestHandler extends AbstractJobVertexRequestHandler {
+
+	public AbstractSubtaskRequestHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+		super(executionGraphHolder, executor);
+	}
+
+	@Override
+	public final CompletableFuture<String> handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) {
+		final String subtaskNumberString = params.get("subtasknum");
+		if (subtaskNumberString == null) {
+			return FutureUtils.completedExceptionally(new FlinkException("Subtask number parameter missing"));
+		}
+
+		final int subtask;
+		try {
+			subtask = Integer.parseInt(subtaskNumberString);
+		}
+		catch (NumberFormatException e) {
+			return FutureUtils.completedExceptionally(new FlinkException("Invalid subtask number parameter", e));
+		}
+
+		if (subtask < 0 || subtask >= jobVertex.getParallelism()) {
+			return FutureUtils.completedExceptionally(new FlinkException("subtask does not exist: " + subtask));
+		}
+
+		final AccessExecutionVertex vertex = jobVertex.getTaskVertices()[subtask];
+		return handleRequest(vertex, params);
+	}
+
+	public abstract CompletableFuture<String> handleRequest(AccessExecutionVertex vertex, Map<String, String> params);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java
new file mode 100644
index 0000000..db13633
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.util.FlinkException;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Responder that returns the status of the Flink cluster, such as how many
+ * TaskManagers are currently connected, and how many jobs are running.
+ */
+public class ClusterOverviewHandler extends AbstractJsonRequestHandler {
+
+	private static final String CLUSTER_OVERVIEW_REST_PATH = "/overview";
+
+	private static final String version = EnvironmentInformation.getVersion();
+
+	private static final String commitID = EnvironmentInformation.getRevisionInformation().commitId;
+
+	private final Time timeout;
+
+	public ClusterOverviewHandler(Executor executor, Time timeout) {
+		super(executor);
+		this.timeout = checkNotNull(timeout);
+	}
+
+	@Override
+	public String[] getPaths() {
+		return new String[]{CLUSTER_OVERVIEW_REST_PATH};
+	}
+
+	@Override
+	public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+		// we need no parameters, get all requests
+		try {
+			if (jobManagerGateway != null) {
+				CompletableFuture<StatusOverview> overviewFuture = jobManagerGateway.requestStatusOverview(timeout);
+
+				return overviewFuture.thenApplyAsync(
+					(StatusOverview overview) -> {
+						StringWriter writer = new StringWriter();
+						try {
+							JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+
+							gen.writeStartObject();
+							gen.writeNumberField("taskmanagers", overview.getNumTaskManagersConnected());
+							gen.writeNumberField("slots-total", overview.getNumSlotsTotal());
+							gen.writeNumberField("slots-available", overview.getNumSlotsAvailable());
+							gen.writeNumberField("jobs-running", overview.getNumJobsRunningOrPending());
+							gen.writeNumberField("jobs-finished", overview.getNumJobsFinished());
+							gen.writeNumberField("jobs-cancelled", overview.getNumJobsCancelled());
+							gen.writeNumberField("jobs-failed", overview.getNumJobsFailed());
+							gen.writeStringField("flink-version", version);
+							if (!commitID.equals(EnvironmentInformation.UNKNOWN)) {
+								gen.writeStringField("flink-commit", commitID);
+							}
+							gen.writeEndObject();
+
+							gen.close();
+							return writer.toString();
+						} catch (IOException exception) {
+							throw new FlinkFutureException("Could not write cluster overview.", exception);
+						}
+					},
+					executor);
+			} else {
+				throw new Exception("No connection to the leading JobManager.");
+			}
+		}
+		catch (Exception e) {
+			return FutureUtils.completedExceptionally(new FlinkException("Failed to fetch list of all running jobs: ", e));
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ConstantTextHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ConstantTextHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ConstantTextHandler.java
new file mode 100644
index 0000000..57214f0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ConstantTextHandler.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.configuration.ConfigConstants;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
+
+/**
+ * Responder that returns a constant String.
+ */
+@ChannelHandler.Sharable
+public class ConstantTextHandler extends SimpleChannelInboundHandler<Routed> {
+
+	private final byte[] encodedText;
+
+	public ConstantTextHandler(String text) {
+		this.encodedText = text.getBytes(ConfigConstants.DEFAULT_CHARSET);
+	}
+
+	@Override
+	protected void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception {
+		HttpResponse response = new DefaultFullHttpResponse(
+			HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(encodedText));
+
+		response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, encodedText.length);
+		response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain");
+
+		KeepAliveWrite.flush(ctx, routed.request(), response);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandler.java
new file mode 100644
index 0000000..07d9707
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandler.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import java.io.StringWriter;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Responder that returns with a list of all JobIDs of jobs found at the target actor.
+ * May serve the IDs of current jobs, or past jobs, depending on whether this handler is
+ * given the JobManager or Archive Actor Reference.
+ */
+public class CurrentJobIdsHandler extends AbstractJsonRequestHandler {
+
+	private static final String CURRENT_JOB_IDS_REST_PATH = "/jobs";
+
+	private final Time timeout;
+
+	public CurrentJobIdsHandler(Executor executor, Time timeout) {
+		super(executor);
+		this.timeout = requireNonNull(timeout);
+	}
+
+	@Override
+	public String[] getPaths() {
+		return new String[]{CURRENT_JOB_IDS_REST_PATH};
+	}
+
+	@Override
+	public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+		return CompletableFuture.supplyAsync(
+			() -> {
+				// we need no parameters, get all requests
+				try {
+					if (jobManagerGateway != null) {
+						CompletableFuture<JobsWithIDsOverview> overviewFuture = jobManagerGateway.requestJobsOverview(timeout);
+						JobsWithIDsOverview overview = overviewFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+
+						StringWriter writer = new StringWriter();
+						JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+
+						gen.writeStartObject();
+
+						gen.writeArrayFieldStart("jobs-running");
+						for (JobID jid : overview.getJobsRunningOrPending()) {
+							gen.writeString(jid.toString());
+						}
+						gen.writeEndArray();
+
+						gen.writeArrayFieldStart("jobs-finished");
+						for (JobID jid : overview.getJobsFinished()) {
+							gen.writeString(jid.toString());
+						}
+						gen.writeEndArray();
+
+						gen.writeArrayFieldStart("jobs-cancelled");
+						for (JobID jid : overview.getJobsCancelled()) {
+							gen.writeString(jid.toString());
+						}
+						gen.writeEndArray();
+
+						gen.writeArrayFieldStart("jobs-failed");
+						for (JobID jid : overview.getJobsFailed()) {
+							gen.writeString(jid.toString());
+						}
+						gen.writeEndArray();
+
+						gen.writeEndObject();
+
+						gen.close();
+						return writer.toString();
+					}
+					else {
+						throw new Exception("No connection to the leading JobManager.");
+					}
+				}
+				catch (Exception e) {
+					throw new FlinkFutureException("Failed to fetch list of all running jobs.", e);
+				}
+			},
+			executor);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java
new file mode 100644
index 0000000..6f85320
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Request handler that returns a summary of the job status.
+ */
+public class CurrentJobsOverviewHandler extends AbstractJsonRequestHandler {
+
+	private static final String ALL_JOBS_REST_PATH = "/joboverview";
+	private static final String RUNNING_JOBS_REST_PATH = "/joboverview/running";
+	private static final String COMPLETED_JOBS_REST_PATH = "/joboverview/completed";
+
+	private final Time timeout;
+
+	private final boolean includeRunningJobs;
+	private final boolean includeFinishedJobs;
+
+	public CurrentJobsOverviewHandler(
+			Executor executor,
+			Time timeout,
+			boolean includeRunningJobs,
+			boolean includeFinishedJobs) {
+
+		super(executor);
+		this.timeout = checkNotNull(timeout);
+		this.includeRunningJobs = includeRunningJobs;
+		this.includeFinishedJobs = includeFinishedJobs;
+	}
+
+	@Override
+	public String[] getPaths() {
+		if (includeRunningJobs && includeFinishedJobs) {
+			return new String[]{ALL_JOBS_REST_PATH};
+		}
+		if (includeRunningJobs) {
+			return new String[]{RUNNING_JOBS_REST_PATH};
+		} else {
+			return new String[]{COMPLETED_JOBS_REST_PATH};
+		}
+	}
+
+	@Override
+	public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+		if (jobManagerGateway != null) {
+			CompletableFuture<MultipleJobsDetails> jobDetailsFuture = jobManagerGateway.requestJobDetails(includeRunningJobs, includeFinishedJobs, timeout);
+
+			return jobDetailsFuture.thenApplyAsync(
+				(MultipleJobsDetails result) -> {
+					final long now = System.currentTimeMillis();
+
+					StringWriter writer = new StringWriter();
+					try {
+						JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+						gen.writeStartObject();
+
+						if (includeRunningJobs && includeFinishedJobs) {
+							gen.writeArrayFieldStart("running");
+							for (JobDetails detail : result.getRunningJobs()) {
+								writeJobDetailOverviewAsJson(detail, gen, now);
+							}
+							gen.writeEndArray();
+
+							gen.writeArrayFieldStart("finished");
+							for (JobDetails detail : result.getFinishedJobs()) {
+								writeJobDetailOverviewAsJson(detail, gen, now);
+							}
+							gen.writeEndArray();
+						} else {
+							gen.writeArrayFieldStart("jobs");
+							for (JobDetails detail : includeRunningJobs ? result.getRunningJobs() : result.getFinishedJobs()) {
+								writeJobDetailOverviewAsJson(detail, gen, now);
+							}
+							gen.writeEndArray();
+						}
+
+						gen.writeEndObject();
+						gen.close();
+						return writer.toString();
+					} catch (IOException e) {
+						throw new FlinkFutureException("Could not write current jobs overview json.", e);
+					}
+				},
+				executor);
+		}
+		else {
+			return FutureUtils.completedExceptionally(new Exception("No connection to the leading JobManager."));
+		}
+	}
+
+	/**
+	 * Archivist for the CurrentJobsOverviewHandler.
+	 */
+	public static class CurrentJobsOverviewJsonArchivist implements JsonArchivist {
+
+		@Override
+		public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+			StringWriter writer = new StringWriter();
+			try (JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer)) {
+				gen.writeStartObject();
+				gen.writeArrayFieldStart("running");
+				gen.writeEndArray();
+				gen.writeArrayFieldStart("finished");
+				writeJobDetailOverviewAsJson(WebMonitorUtils.createDetailsForJob(graph), gen, System.currentTimeMillis());
+				gen.writeEndArray();
+				gen.writeEndObject();
+			}
+			String json = writer.toString();
+			String path = ALL_JOBS_REST_PATH;
+			return Collections.singleton(new ArchivedJson(path, json));
+		}
+	}
+
+	public static void writeJobDetailOverviewAsJson(JobDetails details, JsonGenerator gen, long now) throws IOException {
+		gen.writeStartObject();
+
+		gen.writeStringField("jid", details.getJobId().toString());
+		gen.writeStringField("name", details.getJobName());
+		gen.writeStringField("state", details.getStatus().name());
+
+		gen.writeNumberField("start-time", details.getStartTime());
+		gen.writeNumberField("end-time", details.getEndTime());
+		gen.writeNumberField("duration", (details.getEndTime() <= 0 ? now : details.getEndTime()) - details.getStartTime());
+		gen.writeNumberField("last-modification", details.getLastUpdateTime());
+
+		gen.writeObjectFieldStart("tasks");
+		gen.writeNumberField("total", details.getNumTasks());
+
+		final int[] perState = details.getNumVerticesPerExecutionState();
+		gen.writeNumberField("pending", perState[ExecutionState.CREATED.ordinal()] +
+				perState[ExecutionState.SCHEDULED.ordinal()] +
+				perState[ExecutionState.DEPLOYING.ordinal()]);
+		gen.writeNumberField("running", perState[ExecutionState.RUNNING.ordinal()]);
+		gen.writeNumberField("finished", perState[ExecutionState.FINISHED.ordinal()]);
+		gen.writeNumberField("canceling", perState[ExecutionState.CANCELING.ordinal()]);
+		gen.writeNumberField("canceled", perState[ExecutionState.CANCELED.ordinal()]);
+		gen.writeNumberField("failed", perState[ExecutionState.FAILED.ordinal()]);
+		gen.writeEndObject();
+
+		gen.writeEndObject();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandler.java
new file mode 100644
index 0000000..e8854f4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandler.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Responder that returns the parameters that define how the asynchronous requests
+ * against this web server should behave. It defines for example the refresh interval,
+ * and time zone of the server timestamps.
+ */
+public class DashboardConfigHandler extends AbstractJsonRequestHandler {
+
+	private static final String DASHBOARD_CONFIG_REST_PATH = "/config";
+
+	private final String configString;
+
+	public DashboardConfigHandler(Executor executor, long refreshInterval) {
+		super(executor);
+		try {
+			this.configString = createConfigJson(refreshInterval);
+		}
+		catch (Exception e) {
+			// should never happen
+			throw new RuntimeException(e.getMessage(), e);
+		}
+	}
+
+	@Override
+	public String[] getPaths() {
+		return new String[]{DASHBOARD_CONFIG_REST_PATH};
+	}
+
+	@Override
+	public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+		return CompletableFuture.completedFuture(configString);
+	}
+
+	public static String createConfigJson(long refreshInterval) throws IOException {
+		StringWriter writer = new StringWriter();
+		JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+
+		TimeZone timeZone = TimeZone.getDefault();
+		String timeZoneName = timeZone.getDisplayName();
+		long timeZoneOffset = timeZone.getRawOffset();
+
+		gen.writeStartObject();
+		gen.writeNumberField("refresh-interval", refreshInterval);
+		gen.writeNumberField("timezone-offset", timeZoneOffset);
+		gen.writeStringField("timezone-name", timeZoneName);
+		gen.writeStringField("flink-version", EnvironmentInformation.getVersion());
+
+		EnvironmentInformation.RevisionInformation revision = EnvironmentInformation.getRevisionInformation();
+		if (revision != null) {
+			gen.writeStringField("flink-revision", revision.commitId + " @ " + revision.commitDate);
+		}
+
+		gen.writeEndObject();
+
+		gen.close();
+
+		return writer.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphHolder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphHolder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphHolder.java
new file mode 100644
index 0000000..8a47e50
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphHolder.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+import java.util.WeakHashMap;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Gateway to obtaining an {@link ExecutionGraph} from a source, like JobManager or Archive.
+ *
+ * <p>The holder will cache the ExecutionGraph behind a weak reference, which will be cleared
+ * at some point once no one else is pointing to the ExecutionGraph.
+ * Note that while the holder runs in the same JVM as the JobManager or Archive, the reference should
+ * stay valid.
+ */
+public class ExecutionGraphHolder {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ExecutionGraphHolder.class);
+
+	private final Time timeout;
+
+	private final WeakHashMap<JobID, AccessExecutionGraph> cache = new WeakHashMap<>();
+
+	public ExecutionGraphHolder(Time timeout) {
+		this.timeout = checkNotNull(timeout);
+	}
+
+	/**
+	 * Retrieves the execution graph with {@link JobID} jid wrapped in {@link Optional} or
+	 * {@link Optional#empty()} if it cannot be found.
+	 *
+	 * @param jid jobID of the execution graph to be retrieved
+	 * @return Optional ExecutionGraph if it has been retrievable, empty if there has been no ExecutionGraph
+	 */
+	public CompletableFuture<Optional<AccessExecutionGraph>> getExecutionGraph(JobID jid, JobManagerGateway jobManagerGateway) {
+		AccessExecutionGraph cached = cache.get(jid);
+		if (cached != null) {
+			if (cached.getState() == JobStatus.SUSPENDED) {
+				cache.remove(jid);
+			} else {
+				return CompletableFuture.completedFuture(Optional.of(cached));
+			}
+		}
+
+		CompletableFuture<Optional<AccessExecutionGraph>> executionGraphFuture = jobManagerGateway.requestJob(jid, timeout);
+
+		executionGraphFuture.thenAcceptAsync(
+			optExecutionGraph ->
+				optExecutionGraph.ifPresent(executionGraph -> cache.put(jid, executionGraph)));
+
+		return executionGraphFuture;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandler.java
new file mode 100644
index 0000000..0a3b050
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandler.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler that returns the aggregated user accumulators of a job.
+ */
+public class JobAccumulatorsHandler extends AbstractExecutionGraphRequestHandler {
+
+	private static final String JOB_ACCUMULATORS_REST_PATH = "/jobs/:jobid/accumulators";
+
+	public JobAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+		super(executionGraphHolder, executor);
+	}
+
+	@Override
+	public String[] getPaths() {
+		return new String[]{JOB_ACCUMULATORS_REST_PATH};
+	}
+
+	@Override
+	public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
+		return CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					return createJobAccumulatorsJson(graph);
+				} catch (IOException e) {
+					throw new FlinkFutureException("Could not create job accumulators json.", e);
+				}
+			},
+			executor);
+	}
+
+	/**
+	 * Archivist for the JobAccumulatorsHandler.
+	 */
+	public static class JobAccumulatorsJsonArchivist implements JsonArchivist {
+
+		@Override
+		public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+			String json = createJobAccumulatorsJson(graph);
+			String path = JOB_ACCUMULATORS_REST_PATH
+				.replace(":jobid", graph.getJobID().toString());
+			return Collections.singletonList(new ArchivedJson(path, json));
+		}
+	}
+
+	public static String createJobAccumulatorsJson(AccessExecutionGraph graph) throws IOException {
+		StringWriter writer = new StringWriter();
+		JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+
+		StringifiedAccumulatorResult[] allAccumulators = graph.getAccumulatorResultsStringified();
+
+		gen.writeStartObject();
+
+		gen.writeArrayFieldStart("job-accumulators");
+		// empty for now
+		gen.writeEndArray();
+
+		gen.writeArrayFieldStart("user-task-accumulators");
+		for (StringifiedAccumulatorResult acc : allAccumulators) {
+			gen.writeStartObject();
+			gen.writeStringField("name", acc.getName());
+			gen.writeStringField("type", acc.getType());
+			gen.writeStringField("value", acc.getValue());
+			gen.writeEndObject();
+		}
+		gen.writeEndArray();
+		gen.writeEndObject();
+
+		gen.close();
+		return writer.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationHandler.java
new file mode 100644
index 0000000..a194f30
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationHandler.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler for the CANCEL request.
+ */
+public class JobCancellationHandler extends AbstractJsonRequestHandler {
+
+	private static final String JOB_CONCELLATION_REST_PATH = "/jobs/:jobid/cancel";
+	private static final String JOB_CONCELLATION_YARN_REST_PATH = "/jobs/:jobid/yarn-cancel";
+
+	private final Time timeout;
+
+	public JobCancellationHandler(Executor executor, Time timeout) {
+		super(executor);
+		this.timeout = Preconditions.checkNotNull(timeout);
+	}
+
+	@Override
+	public String[] getPaths() {
+		return new String[]{JOB_CONCELLATION_REST_PATH, JOB_CONCELLATION_YARN_REST_PATH};
+	}
+
+	@Override
+	public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+		return CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					JobID jobId = new JobID(StringUtils.hexStringToByte(pathParams.get("jobid")));
+					if (jobManagerGateway != null) {
+						jobManagerGateway.cancelJob(jobId, timeout);
+						return "{}";
+					}
+					else {
+						throw new Exception("No connection to the leading JobManager.");
+					}
+				}
+				catch (Exception e) {
+					throw new FlinkFutureException("Failed to cancel the job with id: "  + pathParams.get("jobid"), e);
+				}
+			},
+			executor);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlers.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlers.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlers.java
new file mode 100644
index 0000000..23e94f5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlers.java
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.messages.JobManagerMessages.CancelJobWithSavepoint;
+import org.apache.flink.runtime.rest.NotFoundException;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.nio.charset.Charset;
+import java.util.ArrayDeque;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Request handler for {@link CancelJobWithSavepoint} messages.
+ */
+public class JobCancellationWithSavepointHandlers {
+
+	private static final String CANCEL_WITH_SAVEPOINT_REST_PATH = "/jobs/:jobid/cancel-with-savepoint";
+	private static final String CANCEL_WITH_SAVEPOINT_DIRECTORY_REST_PATH = "/jobs/:jobid/cancel-with-savepoint/target-directory/:targetDirectory";
+
+	/** URL for in-progress cancellations. */
+	private static final String CANCELLATION_IN_PROGRESS_REST_PATH = "/jobs/:jobid/cancel-with-savepoint/in-progress/:requestId";
+
+	/** Encodings for String. */
+	private static final Charset ENCODING = ConfigConstants.DEFAULT_CHARSET;
+
+	/** Shared lock between Trigger and In-Progress handlers. */
+	private final Object lock = new Object();
+
+	/** In-Progress requests. */
+	private final Map<JobID, Long> inProgress = new HashMap<>();
+
+	/** Succeeded/failed request. Either String or Throwable. */
+	private final Map<Long, Object> completed = new HashMap<>();
+
+	/** Atomic request counter. */
+	private long requestCounter;
+
+	/** Handler for trigger requests. */
+	private final TriggerHandler triggerHandler;
+
+	/** Handler for in-progress requests. */
+	private final InProgressHandler inProgressHandler;
+
+	/** Default savepoint directory. */
+	private final String defaultSavepointDirectory;
+
+	public JobCancellationWithSavepointHandlers(
+			ExecutionGraphHolder currentGraphs,
+			Executor executor) {
+		this(currentGraphs, executor, null);
+	}
+
+	public JobCancellationWithSavepointHandlers(
+			ExecutionGraphHolder currentGraphs,
+			Executor executor,
+			@Nullable String defaultSavepointDirectory) {
+
+		this.triggerHandler = new TriggerHandler(currentGraphs, executor);
+		this.inProgressHandler = new InProgressHandler();
+		this.defaultSavepointDirectory = defaultSavepointDirectory;
+	}
+
+	public TriggerHandler getTriggerHandler() {
+		return triggerHandler;
+	}
+
+	public InProgressHandler getInProgressHandler() {
+		return inProgressHandler;
+	}
+
+	// ------------------------------------------------------------------------
+	// New requests
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Handler for triggering a {@link CancelJobWithSavepoint} message.
+	 */
+	class TriggerHandler implements RequestHandler {
+
+		/** Current execution graphs. */
+		private final ExecutionGraphHolder currentGraphs;
+
+		/** Execution context for futures. */
+		private final Executor executor;
+
+		public TriggerHandler(ExecutionGraphHolder currentGraphs, Executor executor) {
+			this.currentGraphs = checkNotNull(currentGraphs);
+			this.executor = checkNotNull(executor);
+		}
+
+		@Override
+		public String[] getPaths() {
+			return new String[]{CANCEL_WITH_SAVEPOINT_REST_PATH, CANCEL_WITH_SAVEPOINT_DIRECTORY_REST_PATH};
+		}
+
+		@Override
+		@SuppressWarnings("unchecked")
+		public CompletableFuture<FullHttpResponse> handleRequest(
+				Map<String, String> pathParams,
+				Map<String, String> queryParams,
+				JobManagerGateway jobManagerGateway) {
+
+			if (jobManagerGateway != null) {
+				JobID jobId = JobID.fromHexString(pathParams.get("jobid"));
+				final CompletableFuture<Optional<AccessExecutionGraph>> graphFuture;
+
+				graphFuture = currentGraphs.getExecutionGraph(jobId, jobManagerGateway);
+
+				return graphFuture.thenApplyAsync(
+					(Optional<AccessExecutionGraph> optGraph) -> {
+						final AccessExecutionGraph graph = optGraph.orElseThrow(
+							() -> new FlinkFutureException(
+								new NotFoundException("Could not find ExecutionGraph with jobId " + jobId + '.')));
+
+						CheckpointCoordinator coord = graph.getCheckpointCoordinator();
+						if (coord == null) {
+							throw new FlinkFutureException(new Exception("Cannot find CheckpointCoordinator for job."));
+						}
+
+						String targetDirectory = pathParams.get("targetDirectory");
+						if (targetDirectory == null) {
+							if (defaultSavepointDirectory == null) {
+								throw new IllegalStateException("No savepoint directory configured. " +
+									"You can either specify a directory when triggering this savepoint or " +
+									"configure a cluster-wide default via key '" +
+									CoreOptions.SAVEPOINT_DIRECTORY.key() + "'.");
+							} else {
+								targetDirectory = defaultSavepointDirectory;
+							}
+						}
+
+						try {
+							return handleNewRequest(jobManagerGateway, jobId, targetDirectory, coord.getCheckpointTimeout());
+						} catch (IOException e) {
+							throw new FlinkFutureException("Could not cancel job with savepoint.", e);
+						}
+					}, executor);
+			} else {
+				return FutureUtils.completedExceptionally(new Exception("No connection to the leading JobManager."));
+			}
+		}
+
+		@SuppressWarnings("unchecked")
+		private FullHttpResponse handleNewRequest(JobManagerGateway jobManagerGateway, final JobID jobId, String targetDirectory, long checkpointTimeout) throws IOException {
+			// Check whether a request exists
+			final long requestId;
+			final boolean isNewRequest;
+			synchronized (lock) {
+				if (inProgress.containsKey(jobId)) {
+					requestId = inProgress.get(jobId);
+					isNewRequest = false;
+				} else {
+					requestId = ++requestCounter;
+					inProgress.put(jobId, requestId);
+					isNewRequest = true;
+				}
+			}
+
+			if (isNewRequest) {
+				boolean success = false;
+
+				try {
+					// Trigger cancellation
+					CompletableFuture<String> cancelJobFuture = jobManagerGateway
+						.cancelJobWithSavepoint(jobId, targetDirectory, Time.milliseconds(checkpointTimeout));
+
+					cancelJobFuture.whenCompleteAsync(
+						(String path, Throwable throwable) -> {
+							try {
+								if (throwable != null) {
+									completed.put(requestId, throwable);
+								} else {
+									completed.put(requestId, path);
+								}
+							} finally {
+								inProgress.remove(jobId);
+							}
+						}, executor);
+
+					success = true;
+				} finally {
+					synchronized (lock) {
+						if (!success) {
+							inProgress.remove(jobId);
+						}
+					}
+				}
+			}
+
+			// In-progress location
+			String location = CANCELLATION_IN_PROGRESS_REST_PATH
+					.replace(":jobid", jobId.toString())
+					.replace(":requestId", Long.toString(requestId));
+
+			// Accepted response
+			StringWriter writer = new StringWriter();
+			JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+			gen.writeStartObject();
+			gen.writeStringField("status", "accepted");
+			gen.writeNumberField("request-id", requestId);
+			gen.writeStringField("location", location);
+			gen.writeEndObject();
+			gen.close();
+
+			String json = writer.toString();
+			byte[] bytes = json.getBytes(ENCODING);
+
+			DefaultFullHttpResponse response = new DefaultFullHttpResponse(
+					HttpVersion.HTTP_1_1,
+					HttpResponseStatus.ACCEPTED,
+					Unpooled.wrappedBuffer(bytes));
+
+			response.headers().set(HttpHeaders.Names.LOCATION, location);
+
+			response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/json; charset=" + ENCODING.name());
+			response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
+
+			FullHttpResponse accepted = response;
+
+			return accepted;
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	// In-progress requests
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Handler for in-progress cancel with savepoint operations.
+	 */
+	class InProgressHandler implements RequestHandler {
+
+		/** The number of recent checkpoints whose IDs are remembered. */
+		private static final int NUM_GHOST_REQUEST_IDS = 16;
+
+		/** Remember some recently completed. */
+		private final ArrayDeque<Tuple2<Long, Object>> recentlyCompleted = new ArrayDeque<>(NUM_GHOST_REQUEST_IDS);
+
+		@Override
+		public String[] getPaths() {
+			return new String[]{CANCELLATION_IN_PROGRESS_REST_PATH};
+		}
+
+		@Override
+		@SuppressWarnings("unchecked")
+		public CompletableFuture<FullHttpResponse> handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+			JobID jobId = JobID.fromHexString(pathParams.get("jobid"));
+			long requestId = Long.parseLong(pathParams.get("requestId"));
+
+			return CompletableFuture.supplyAsync(
+				() -> {
+					try {
+						synchronized (lock) {
+							Object result = completed.remove(requestId);
+
+							if (result != null) {
+								// Add to recent history
+								recentlyCompleted.add(new Tuple2<>(requestId, result));
+								if (recentlyCompleted.size() > NUM_GHOST_REQUEST_IDS) {
+									recentlyCompleted.remove();
+								}
+
+								if (result.getClass() == String.class) {
+									String savepointPath = (String) result;
+									return createSuccessResponse(requestId, savepointPath);
+								} else {
+									Throwable cause = (Throwable) result;
+									return createFailureResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR, requestId, cause.getMessage());
+								}
+							} else {
+								// Check in-progress
+								Long inProgressRequestId = inProgress.get(jobId);
+								if (inProgressRequestId != null) {
+									// Sanity check
+									if (inProgressRequestId == requestId) {
+										return createInProgressResponse(requestId);
+									} else {
+										String msg = "Request ID does not belong to JobID";
+										return createFailureResponse(HttpResponseStatus.BAD_REQUEST, requestId, msg);
+									}
+								}
+
+								// Check recent history
+								for (Tuple2<Long, Object> recent : recentlyCompleted) {
+									if (recent.f0 == requestId) {
+										if (recent.f1.getClass() == String.class) {
+											String savepointPath = (String) recent.f1;
+											return createSuccessResponse(requestId, savepointPath);
+										} else {
+											Throwable cause = (Throwable) recent.f1;
+											return createFailureResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR, requestId, cause.getMessage());
+										}
+									}
+								}
+
+								return createFailureResponse(HttpResponseStatus.BAD_REQUEST, requestId, "Unknown job/request ID");
+							}
+						}
+					} catch (Exception e) {
+						throw new FlinkFutureException("Could not handle in progress request.", e);
+					}
+				});
+		}
+
+		private FullHttpResponse createSuccessResponse(long requestId, String savepointPath) throws IOException {
+			StringWriter writer = new StringWriter();
+			JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+			gen.writeStartObject();
+
+			gen.writeStringField("status", "success");
+			gen.writeNumberField("request-id", requestId);
+			gen.writeStringField("savepoint-path", savepointPath);
+
+			gen.writeEndObject();
+			gen.close();
+
+			String json = writer.toString();
+			byte[] bytes = json.getBytes(ENCODING);
+
+			DefaultFullHttpResponse response = new DefaultFullHttpResponse(
+					HttpVersion.HTTP_1_1,
+					HttpResponseStatus.CREATED,
+					Unpooled.wrappedBuffer(bytes));
+
+			response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/json; charset=" + ENCODING.name());
+			response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
+
+			return response;
+		}
+
+		private FullHttpResponse createInProgressResponse(long requestId) throws IOException {
+			StringWriter writer = new StringWriter();
+			JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+			gen.writeStartObject();
+
+			gen.writeStringField("status", "in-progress");
+			gen.writeNumberField("request-id", requestId);
+
+			gen.writeEndObject();
+			gen.close();
+
+			String json = writer.toString();
+			byte[] bytes = json.getBytes(ENCODING);
+
+			DefaultFullHttpResponse response = new DefaultFullHttpResponse(
+					HttpVersion.HTTP_1_1,
+					HttpResponseStatus.ACCEPTED,
+					Unpooled.wrappedBuffer(bytes));
+
+			response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/json; charset=" + ENCODING.name());
+			response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
+
+			return response;
+		}
+
+		private FullHttpResponse createFailureResponse(HttpResponseStatus code, long requestId, String errMsg) throws IOException {
+			StringWriter writer = new StringWriter();
+			JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+			gen.writeStartObject();
+
+			gen.writeStringField("status", "failed");
+			gen.writeNumberField("request-id", requestId);
+			gen.writeStringField("cause", errMsg);
+
+			gen.writeEndObject();
+			gen.close();
+
+			String json = writer.toString();
+			byte[] bytes = json.getBytes(ENCODING);
+
+			DefaultFullHttpResponse response = new DefaultFullHttpResponse(
+					HttpVersion.HTTP_1_1,
+					code,
+					Unpooled.wrappedBuffer(bytes));
+
+			response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/json; charset=" + ENCODING.name());
+			response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
+
+			return response;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandler.java
new file mode 100644
index 0000000..bb1cf8f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandler.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.api.common.ArchivedExecutionConfig;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler that returns the execution config of a job.
+ */
+public class JobConfigHandler extends AbstractExecutionGraphRequestHandler {
+
+	private static final String JOB_CONFIG_REST_PATH = "/jobs/:jobid/config";
+
+	public JobConfigHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+		super(executionGraphHolder, executor);
+	}
+
+	@Override
+	public String[] getPaths() {
+		return new String[]{JOB_CONFIG_REST_PATH};
+	}
+
+	@Override
+	public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
+		return CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					return createJobConfigJson(graph);
+				} catch (IOException e) {
+					throw new FlinkFutureException("Could not write job config json.", e);
+				}
+			},
+			executor);
+
+	}
+
+	/**
+	 * Archivist for the JobConfigHandler.
+	 */
+	public static class JobConfigJsonArchivist implements JsonArchivist {
+
+		@Override
+		public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+			String json = createJobConfigJson(graph);
+			String path = JOB_CONFIG_REST_PATH
+				.replace(":jobid", graph.getJobID().toString());
+			return Collections.singletonList(new ArchivedJson(path, json));
+		}
+	}
+
+	public static String createJobConfigJson(AccessExecutionGraph graph) throws IOException {
+		StringWriter writer = new StringWriter();
+		JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+
+		gen.writeStartObject();
+		gen.writeStringField("jid", graph.getJobID().toString());
+		gen.writeStringField("name", graph.getJobName());
+
+		final ArchivedExecutionConfig summary = graph.getArchivedExecutionConfig();
+
+		if (summary != null) {
+			gen.writeObjectFieldStart("execution-config");
+
+			gen.writeStringField("execution-mode", summary.getExecutionMode());
+
+			gen.writeStringField("restart-strategy", summary.getRestartStrategyDescription());
+			gen.writeNumberField("job-parallelism", summary.getParallelism());
+			gen.writeBooleanField("object-reuse-mode", summary.getObjectReuseEnabled());
+
+			Map<String, String> ucVals = summary.getGlobalJobParameters();
+			if (ucVals != null) {
+				gen.writeObjectFieldStart("user-config");
+
+				for (Map.Entry<String, String> ucVal : ucVals.entrySet()) {
+					gen.writeStringField(ucVal.getKey(), ucVal.getValue());
+				}
+
+				gen.writeEndObject();
+			}
+
+			gen.writeEndObject();
+		}
+		gen.writeEndObject();
+
+		gen.close();
+		return writer.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandler.java
new file mode 100644
index 0000000..dd6aee8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandler.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.util.MutableIOMetrics;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler that returns details about a job. This includes:
+ * <ul>
+ *     <li>Dataflow plan</li>
+ *     <li>id, name, and current status</li>
+ *     <li>start time, end time, duration</li>
+ *     <li>number of job vertices in each state (pending, running, finished, failed)</li>
+ *     <li>info about job vertices, including runtime, status, I/O bytes and records, subtasks in each status</li>
+ * </ul>
+ */
+public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler {
+
+	private static final String JOB_DETAILS_REST_PATH = "/jobs/:jobid";
+	private static final String JOB_DETAILS_VERTICES_REST_PATH = "/jobs/:jobid/vertices";
+
+	private final MetricFetcher fetcher;
+
+	public JobDetailsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, MetricFetcher fetcher) {
+		super(executionGraphHolder, executor);
+		this.fetcher = fetcher;
+	}
+
+	@Override
+	public String[] getPaths() {
+		return new String[]{JOB_DETAILS_REST_PATH, JOB_DETAILS_VERTICES_REST_PATH};
+	}
+
+	@Override
+	public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
+		return CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					return createJobDetailsJson(graph, fetcher);
+				} catch (IOException e) {
+					throw new FlinkFutureException("Could not create job details json.", e);
+				}
+			},
+			executor);
+	}
+
+	/**
+	 * Archivist for the JobDetailsHandler.
+	 */
+	public static class JobDetailsJsonArchivist implements JsonArchivist {
+
+		@Override
+		public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+			String json = createJobDetailsJson(graph, null);
+			String path1 = JOB_DETAILS_REST_PATH
+				.replace(":jobid", graph.getJobID().toString());
+			String path2 = JOB_DETAILS_VERTICES_REST_PATH
+				.replace(":jobid", graph.getJobID().toString());
+			Collection<ArchivedJson> archives = new ArrayList<>();
+			archives.add(new ArchivedJson(path1, json));
+			archives.add(new ArchivedJson(path2, json));
+			return archives;
+		}
+	}
+
+	public static String createJobDetailsJson(AccessExecutionGraph graph, @Nullable MetricFetcher fetcher) throws IOException {
+		final StringWriter writer = new StringWriter();
+		final JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+
+		final long now = System.currentTimeMillis();
+
+		gen.writeStartObject();
+
+		// basic info
+		gen.writeStringField("jid", graph.getJobID().toString());
+		gen.writeStringField("name", graph.getJobName());
+		gen.writeBooleanField("isStoppable", graph.isStoppable());
+		gen.writeStringField("state", graph.getState().name());
+
+		// times and duration
+		final long jobStartTime = graph.getStatusTimestamp(JobStatus.CREATED);
+		final long jobEndTime = graph.getState().isGloballyTerminalState() ?
+				graph.getStatusTimestamp(graph.getState()) : -1L;
+		gen.writeNumberField("start-time", jobStartTime);
+		gen.writeNumberField("end-time", jobEndTime);
+		gen.writeNumberField("duration", (jobEndTime > 0 ? jobEndTime : now) - jobStartTime);
+		gen.writeNumberField("now", now);
+
+		// timestamps
+		gen.writeObjectFieldStart("timestamps");
+		for (JobStatus status : JobStatus.values()) {
+			gen.writeNumberField(status.name(), graph.getStatusTimestamp(status));
+		}
+		gen.writeEndObject();
+
+		// job vertices
+		int[] jobVerticesPerState = new int[ExecutionState.values().length];
+		gen.writeArrayFieldStart("vertices");
+
+		for (AccessExecutionJobVertex ejv : graph.getVerticesTopologically()) {
+			int[] tasksPerState = new int[ExecutionState.values().length];
+			long startTime = Long.MAX_VALUE;
+			long endTime = 0;
+			boolean allFinished = true;
+
+			for (AccessExecutionVertex vertex : ejv.getTaskVertices()) {
+				final ExecutionState state = vertex.getExecutionState();
+				tasksPerState[state.ordinal()]++;
+
+				// take the earliest start time
+				long started = vertex.getStateTimestamp(ExecutionState.DEPLOYING);
+				if (started > 0) {
+					startTime = Math.min(startTime, started);
+				}
+
+				allFinished &= state.isTerminal();
+				endTime = Math.max(endTime, vertex.getStateTimestamp(state));
+			}
+
+			long duration;
+			if (startTime < Long.MAX_VALUE) {
+				if (allFinished) {
+					duration = endTime - startTime;
+				}
+				else {
+					endTime = -1L;
+					duration = now - startTime;
+				}
+			}
+			else {
+				startTime = -1L;
+				endTime = -1L;
+				duration = -1L;
+			}
+
+			ExecutionState jobVertexState =
+					ExecutionJobVertex.getAggregateJobVertexState(tasksPerState, ejv.getParallelism());
+			jobVerticesPerState[jobVertexState.ordinal()]++;
+
+			gen.writeStartObject();
+			gen.writeStringField("id", ejv.getJobVertexId().toString());
+			gen.writeStringField("name", ejv.getName());
+			gen.writeNumberField("parallelism", ejv.getParallelism());
+			gen.writeStringField("status", jobVertexState.name());
+
+			gen.writeNumberField("start-time", startTime);
+			gen.writeNumberField("end-time", endTime);
+			gen.writeNumberField("duration", duration);
+
+			gen.writeObjectFieldStart("tasks");
+			for (ExecutionState state : ExecutionState.values()) {
+				gen.writeNumberField(state.name(), tasksPerState[state.ordinal()]);
+			}
+			gen.writeEndObject();
+
+			MutableIOMetrics counts = new MutableIOMetrics();
+
+			for (AccessExecutionVertex vertex : ejv.getTaskVertices()) {
+				counts.addIOMetrics(
+					vertex.getCurrentExecutionAttempt(),
+					fetcher,
+					graph.getJobID().toString(),
+					ejv.getJobVertexId().toString());
+			}
+
+			counts.writeIOMetricsAsJson(gen);
+
+			gen.writeEndObject();
+		}
+		gen.writeEndArray();
+
+		gen.writeObjectFieldStart("status-counts");
+		for (ExecutionState state : ExecutionState.values()) {
+			gen.writeNumberField(state.name(), jobVerticesPerState[state.ordinal()]);
+		}
+		gen.writeEndObject();
+
+		gen.writeFieldName("plan");
+		gen.writeRawValue(graph.getJsonPlan());
+
+		gen.writeEndObject();
+
+		gen.close();
+		return writer.toString();
+	}
+}


Mime
View raw message