flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [06/16] flink git commit: [FLINK-7531] Move Flink legacy rest handler to flink-runtime
Date Tue, 19 Sep 2017 22:44:16 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java
new file mode 100644
index 0000000..95d417a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A request handler that provides an overview over all taskmanagers or details for a single one.
+ */
+public class TaskManagersHandler extends AbstractJsonRequestHandler  {
+
+	private static final String TASKMANAGERS_REST_PATH = "/taskmanagers";
+	private static final String TASKMANAGER_DETAILS_REST_PATH = "/taskmanagers/:taskmanagerid";
+
+	public static final String TASK_MANAGER_ID_KEY = "taskmanagerid";
+
+	private final Time timeout;
+
+	private final MetricFetcher fetcher;
+
+	public TaskManagersHandler(Executor executor, Time timeout, MetricFetcher fetcher) {
+		super(executor);
+		this.timeout = requireNonNull(timeout);
+		this.fetcher = fetcher;
+	}
+
+	@Override
+	public String[] getPaths() {
+		return new String[]{TASKMANAGERS_REST_PATH, TASKMANAGER_DETAILS_REST_PATH};
+	}
+
+	@Override
+	public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+		if (jobManagerGateway != null) {
+			// whether one task manager's metrics are requested, or all task manager, we
+			// return them in an array. This avoids unnecessary code complexity.
+			// If only one task manager is requested, we only fetch one task manager metrics.
+			if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) {
+				InstanceID instanceID = new InstanceID(StringUtils.hexStringToByte(pathParams.get(TASK_MANAGER_ID_KEY)));
+				CompletableFuture<Optional<Instance>> tmInstanceFuture = jobManagerGateway.requestTaskManagerInstance(instanceID, timeout);
+
+				return tmInstanceFuture.thenApplyAsync(
+					(Optional<Instance> optTaskManager) -> {
+						try {
+							return writeTaskManagersJson(
+								optTaskManager.map(Collections::singleton).orElse(Collections.emptySet()),
+								pathParams);
+						} catch (IOException e) {
+							throw new FlinkFutureException("Could not write TaskManagers JSON.", e);
+						}
+					},
+					executor);
+			} else {
+				CompletableFuture<Collection<Instance>> tmInstancesFuture = jobManagerGateway.requestTaskManagerInstances(timeout);
+
+				return tmInstancesFuture.thenApplyAsync(
+					(Collection<Instance> taskManagers) -> {
+						try {
+							return writeTaskManagersJson(taskManagers, pathParams);
+						} catch (IOException e) {
+							throw new FlinkFutureException("Could not write TaskManagers JSON.", e);
+						}
+					},
+					executor);
+			}
+		}
+		else {
+			return FutureUtils.completedExceptionally(new Exception("No connection to the leading JobManager."));
+		}
+	}
+
+	private String writeTaskManagersJson(Collection<Instance> instances, Map<String, String> pathParams) throws IOException {
+		StringWriter writer = new StringWriter();
+		JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+
+		gen.writeStartObject();
+		gen.writeArrayFieldStart("taskmanagers");
+
+		for (Instance instance : instances) {
+			gen.writeStartObject();
+			gen.writeStringField("id", instance.getId().toString());
+			gen.writeStringField("path", instance.getTaskManagerGateway().getAddress());
+			gen.writeNumberField("dataPort", instance.getTaskManagerLocation().dataPort());
+			gen.writeNumberField("timeSinceLastHeartbeat", instance.getLastHeartBeat());
+			gen.writeNumberField("slotsNumber", instance.getTotalNumberOfSlots());
+			gen.writeNumberField("freeSlots", instance.getNumberOfAvailableSlots());
+			gen.writeNumberField("cpuCores", instance.getResources().getNumberOfCPUCores());
+			gen.writeNumberField("physicalMemory", instance.getResources().getSizeOfPhysicalMemory());
+			gen.writeNumberField("freeMemory", instance.getResources().getSizeOfJvmHeap());
+			gen.writeNumberField("managedMemory", instance.getResources().getSizeOfManagedMemory());
+
+			// only send metrics when only one task manager requests them.
+			if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) {
+				fetcher.update();
+				MetricStore.TaskManagerMetricStore metrics = fetcher.getMetricStore().getTaskManagerMetricStore(instance.getId().toString());
+				if (metrics != null) {
+					gen.writeObjectFieldStart("metrics");
+					long heapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Used", "0"));
+					long heapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Committed", "0"));
+					long heapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Max", "0"));
+
+					gen.writeNumberField("heapCommitted", heapCommitted);
+					gen.writeNumberField("heapUsed", heapUsed);
+					gen.writeNumberField("heapMax", heapTotal);
+
+					long nonHeapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Used", "0"));
+					long nonHeapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Committed", "0"));
+					long nonHeapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Max", "0"));
+
+					gen.writeNumberField("nonHeapCommitted", nonHeapCommitted);
+					gen.writeNumberField("nonHeapUsed", nonHeapUsed);
+					gen.writeNumberField("nonHeapMax", nonHeapTotal);
+
+					gen.writeNumberField("totalCommitted", heapCommitted + nonHeapCommitted);
+					gen.writeNumberField("totalUsed", heapUsed + nonHeapUsed);
+					gen.writeNumberField("totalMax", heapTotal + nonHeapTotal);
+
+					long directCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.Count", "0"));
+					long directUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.MemoryUsed", "0"));
+					long directMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.TotalCapacity", "0"));
+
+					gen.writeNumberField("directCount", directCount);
+					gen.writeNumberField("directUsed", directUsed);
+					gen.writeNumberField("directMax", directMax);
+
+					long mappedCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.Count", "0"));
+					long mappedUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.MemoryUsed", "0"));
+					long mappedMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.TotalCapacity", "0"));
+
+					gen.writeNumberField("mappedCount", mappedCount);
+					gen.writeNumberField("mappedUsed", mappedUsed);
+					gen.writeNumberField("mappedMax", mappedMax);
+
+					long memorySegmentsAvailable = Long.valueOf(metrics.getMetric("Status.Network.AvailableMemorySegments", "0"));
+					long memorySegmentsTotal = Long.valueOf(metrics.getMetric("Status.Network.TotalMemorySegments", "0"));
+
+					gen.writeNumberField("memorySegmentsAvailable", memorySegmentsAvailable);
+					gen.writeNumberField("memorySegmentsTotal", memorySegmentsTotal);
+
+					gen.writeArrayFieldStart("garbageCollectors");
+
+					for (String gcName : metrics.garbageCollectorNames) {
+						String count = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Count", null);
+						String time = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Time", null);
+						if (count != null  && time != null) {
+							gen.writeStartObject();
+							gen.writeStringField("name", gcName);
+							gen.writeNumberField("count", Long.valueOf(count));
+							gen.writeNumberField("time", Long.valueOf(time));
+							gen.writeEndObject();
+						}
+					}
+
+					gen.writeEndArray();
+					gen.writeEndObject();
+				}
+			}
+
+			gen.writeEndObject();
+		}
+
+		gen.writeEndArray();
+		gen.writeEndObject();
+
+		gen.close();
+		return writer.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTracker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTracker.java
new file mode 100644
index 0000000..96bf7ec
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTracker.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.backpressure;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Back pressure statistics tracker.
+ *
+ * <p>Back pressure is determined by sampling running tasks. If a task is
+ * slowed down by back pressure it will be stuck in memory requests to a
+ * {@link org.apache.flink.runtime.io.network.buffer.LocalBufferPool}.
+ *
+ * <p>The back pressured stack traces look like this:
+ *
+ * <pre>
+ * java.lang.Object.wait(Native Method)
+ * o.a.f.[...].LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
+ * o.a.f.[...].LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133) <--- BLOCKING
+ * request
+ * [...]
+ * </pre>
+ */
+public class BackPressureStatsTracker {
+
+	private static final Logger LOG = LoggerFactory.getLogger(BackPressureStatsTracker.class);
+
+	/** Maximum stack trace depth for samples. */
+	static final int MAX_STACK_TRACE_DEPTH = 3;
+
+	/** Expected class name for back pressure indicating stack trace element. */
+	static final String EXPECTED_CLASS_NAME = "org.apache.flink.runtime.io.network.buffer.LocalBufferPool";
+
+	/** Expected method name for back pressure indicating stack trace element. */
+	static final String EXPECTED_METHOD_NAME = "requestBufferBlocking";
+
+	/** Lock guarding trigger operations. */
+	private final Object lock = new Object();
+
+	/* Stack trace sample coordinator. */
+	private final StackTraceSampleCoordinator coordinator;
+
+	/**
+	 * Completed stats. Important: Job vertex IDs need to be scoped by job ID,
+	 * because they are potentially constant across runs messing up the cached
+	 * data.
+	 */
+	private final Cache<ExecutionJobVertex, OperatorBackPressureStats> operatorStatsCache;
+
+	/** Pending in progress stats. Important: Job vertex IDs need to be scoped
+	 * by job ID, because they are potentially constant across runs messing up
+	 * the cached data.*/
+	private final Set<ExecutionJobVertex> pendingStats = new HashSet<>();
+
+	/** Cleanup interval for completed stats cache. */
+	private final int cleanUpInterval;
+
+	private final int numSamples;
+
+	private final Time delayBetweenSamples;
+
+	/** Flag indicating whether the stats tracker has been shut down. */
+	private boolean shutDown;
+
+	/**
+	 * Creates a back pressure statistics tracker.
+	 *
+	 * @param cleanUpInterval     Clean up interval for completed stats.
+	 * @param numSamples          Number of stack trace samples when determining back pressure.
+	 * @param delayBetweenSamples Delay between samples when determining back pressure.
+	 */
+	public BackPressureStatsTracker(
+			StackTraceSampleCoordinator coordinator,
+			int cleanUpInterval,
+			int numSamples,
+			Time delayBetweenSamples) {
+
+		this.coordinator = checkNotNull(coordinator, "Stack trace sample coordinator");
+
+		checkArgument(cleanUpInterval >= 0, "Clean up interval");
+		this.cleanUpInterval = cleanUpInterval;
+
+		checkArgument(numSamples >= 1, "Number of samples");
+		this.numSamples = numSamples;
+
+		this.delayBetweenSamples = checkNotNull(delayBetweenSamples, "Delay between samples");
+
+		this.operatorStatsCache = CacheBuilder.newBuilder()
+				.concurrencyLevel(1)
+				.expireAfterAccess(cleanUpInterval, TimeUnit.MILLISECONDS)
+				.build();
+	}
+
+	/** Cleanup interval for completed stats cache. */
+	public long getCleanUpInterval() {
+		return cleanUpInterval;
+	}
+
+	/**
+	 * Returns back pressure statistics for a operator.
+	 *
+	 * @param vertex Operator to get the stats for.
+	 *
+	 * @return Back pressure statistics for an operator
+	 */
+	public Optional<OperatorBackPressureStats> getOperatorBackPressureStats(ExecutionJobVertex vertex) {
+		return Optional.ofNullable(operatorStatsCache.getIfPresent(vertex));
+	}
+
+	/**
+	 * Triggers a stack trace sample for a operator to gather the back pressure
+	 * statistics. If there is a sample in progress for the operator, the call
+	 * is ignored.
+	 *
+	 * @param vertex Operator to get the stats for.
+	 * @return Flag indicating whether a sample with triggered.
+	 */
+	@SuppressWarnings("unchecked")
+	public boolean triggerStackTraceSample(ExecutionJobVertex vertex) {
+		synchronized (lock) {
+			if (shutDown) {
+				return false;
+			}
+
+			if (!pendingStats.contains(vertex) &&
+					!vertex.getGraph().getState().isGloballyTerminalState()) {
+
+				Executor executor = vertex.getGraph().getFutureExecutor();
+
+				// Only trigger if still active job
+				if (executor != null) {
+					pendingStats.add(vertex);
+
+					if (LOG.isDebugEnabled()) {
+						LOG.debug("Triggering stack trace sample for tasks: " + Arrays.toString(vertex.getTaskVertices()));
+					}
+
+					CompletableFuture<StackTraceSample> sample = coordinator.triggerStackTraceSample(
+							vertex.getTaskVertices(),
+							numSamples,
+							delayBetweenSamples,
+							MAX_STACK_TRACE_DEPTH);
+
+					sample.handleAsync(new StackTraceSampleCompletionCallback(vertex), executor);
+
+					return true;
+				}
+			}
+
+			return false;
+		}
+	}
+
+	/**
+	 * Cleans up the operator stats cache if it contains timed out entries.
+	 *
+	 * <p>The Guava cache only evicts as maintenance during normal operations.
+	 * If this handler is inactive, it will never be cleaned.
+	 */
+	public void cleanUpOperatorStatsCache() {
+		operatorStatsCache.cleanUp();
+	}
+
+	/**
+	 * Shuts down the stats tracker.
+	 *
+	 * <p>Invalidates the cache and clears all pending stats.
+	 */
+	public void shutDown() {
+		synchronized (lock) {
+			if (!shutDown) {
+				operatorStatsCache.invalidateAll();
+				pendingStats.clear();
+
+				shutDown = true;
+			}
+		}
+	}
+
+	/**
+	 * Invalidates the cache (irrespective of clean up interval).
+	 */
+	void invalidateOperatorStatsCache() {
+		operatorStatsCache.invalidateAll();
+	}
+
+	/**
+	 * Callback on completed stack trace sample.
+	 */
+	class StackTraceSampleCompletionCallback implements BiFunction<StackTraceSample, Throwable, Void> {
+
+		private final ExecutionJobVertex vertex;
+
+		public StackTraceSampleCompletionCallback(ExecutionJobVertex vertex) {
+			this.vertex = vertex;
+		}
+
+		@Override
+		public Void apply(StackTraceSample stackTraceSample, Throwable throwable) {
+			synchronized (lock) {
+				try {
+					if (shutDown) {
+						return null;
+					}
+
+					// Job finished, ignore.
+					JobStatus jobState = vertex.getGraph().getState();
+					if (jobState.isGloballyTerminalState()) {
+						LOG.debug("Ignoring sample, because job is in state " + jobState + ".");
+					} else if (stackTraceSample != null) {
+						OperatorBackPressureStats stats = createStatsFromSample(stackTraceSample);
+						operatorStatsCache.put(vertex, stats);
+					} else {
+						LOG.debug("Failed to gather stack trace sample.", throwable);
+					}
+				} catch (Throwable t) {
+					LOG.error("Error during stats completion.", t);
+				} finally {
+					pendingStats.remove(vertex);
+				}
+
+				return null;
+			}
+		}
+
+		/**
+		 * Creates the back pressure stats from a stack trace sample.
+		 *
+		 * @param sample Stack trace sample to base stats on.
+		 *
+		 * @return Back pressure stats
+		 */
+		private OperatorBackPressureStats createStatsFromSample(StackTraceSample sample) {
+			Map<ExecutionAttemptID, List<StackTraceElement[]>> traces = sample.getStackTraces();
+
+			// Map task ID to subtask index, because the web interface expects
+			// it like that.
+			Map<ExecutionAttemptID, Integer> subtaskIndexMap = Maps
+					.newHashMapWithExpectedSize(traces.size());
+
+			Set<ExecutionAttemptID> sampledTasks = sample.getStackTraces().keySet();
+
+			for (ExecutionVertex task : vertex.getTaskVertices()) {
+				ExecutionAttemptID taskId = task.getCurrentExecutionAttempt().getAttemptId();
+				if (sampledTasks.contains(taskId)) {
+					subtaskIndexMap.put(taskId, task.getParallelSubtaskIndex());
+				} else {
+					LOG.debug("Outdated sample. A task, which is part of the " +
+							"sample has been reset.");
+				}
+			}
+
+			// Ratio of blocked samples to total samples per sub task. Array
+			// position corresponds to sub task index.
+			double[] backPressureRatio = new double[traces.size()];
+
+			for (Entry<ExecutionAttemptID, List<StackTraceElement[]>> entry : traces.entrySet()) {
+				int backPressureSamples = 0;
+
+				List<StackTraceElement[]> taskTraces = entry.getValue();
+
+				for (StackTraceElement[] trace : taskTraces) {
+					for (int i = trace.length - 1; i >= 0; i--) {
+						StackTraceElement elem = trace[i];
+
+						if (elem.getClassName().equals(EXPECTED_CLASS_NAME) &&
+								elem.getMethodName().equals(EXPECTED_METHOD_NAME)) {
+
+							backPressureSamples++;
+							break; // Continue with next stack trace
+						}
+					}
+				}
+
+				int subtaskIndex = subtaskIndexMap.get(entry.getKey());
+
+				int size = taskTraces.size();
+				double ratio = (size > 0)
+						? ((double) backPressureSamples) / size
+						: 0;
+
+				backPressureRatio[subtaskIndex] = ratio;
+			}
+
+			return new OperatorBackPressureStats(
+					sample.getSampleId(),
+					sample.getEndTime(),
+					backPressureRatio);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/OperatorBackPressureStats.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/OperatorBackPressureStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/OperatorBackPressureStats.java
new file mode 100644
index 0000000..1a78a17
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/OperatorBackPressureStats.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.backpressure;
+
+import java.util.Arrays;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Back pressure statistics of multiple tasks.
+ *
+ * <p>Statistics are gathered by sampling stack traces of running tasks. The
+ * back pressure ratio denotes the ratio of traces indicating back pressure
+ * to the total number of sampled traces.
+ */
+public class OperatorBackPressureStats {
+
+	/** ID of the corresponding sample. */
+	private final int sampleId;
+
+	/** End time stamp of the corresponding sample. */
+	private final long endTimestamp;
+
+	/** Back pressure ratio per subtask. */
+	private final double[] subTaskBackPressureRatio;
+
+	/** Maximum back pressure ratio. */
+	private final double maxSubTaskBackPressureRatio;
+
+	public OperatorBackPressureStats(
+			int sampleId,
+			long endTimestamp,
+			double[] subTaskBackPressureRatio) {
+
+		this.sampleId = sampleId;
+		this.endTimestamp = endTimestamp;
+		this.subTaskBackPressureRatio = checkNotNull(subTaskBackPressureRatio, "Sub task back pressure ratio");
+		checkArgument(subTaskBackPressureRatio.length >= 1, "No Sub task back pressure ratio specified");
+
+		double max = 0;
+		for (double ratio : subTaskBackPressureRatio) {
+			if (ratio > max) {
+				max = ratio;
+			}
+		}
+
+		maxSubTaskBackPressureRatio = max;
+	}
+
+	/**
+	 * Returns the ID of the sample.
+	 *
+	 * @return ID of the sample
+	 */
+	public int getSampleId() {
+		return sampleId;
+	}
+
+	/**
+	 * Returns the time stamp, when all stack traces were collected at the
+	 * JobManager.
+	 *
+	 * @return Time stamp, when all stack traces were collected at the
+	 * JobManager
+	 */
+	public long getEndTimestamp() {
+		return endTimestamp;
+	}
+
+	/**
+	 * Returns the number of sub tasks.
+	 *
+	 * @return Number of sub tasks.
+	 */
+	public int getNumberOfSubTasks() {
+		return subTaskBackPressureRatio.length;
+	}
+
+	/**
+	 * Returns the ratio of stack traces indicating back pressure to total
+	 * number of sampled stack traces.
+	 *
+	 * @param index Subtask index.
+	 *
+	 * @return Ratio of stack traces indicating back pressure to total number
+	 * of sampled stack traces.
+	 */
+	public double getBackPressureRatio(int index) {
+		return subTaskBackPressureRatio[index];
+	}
+
+	/**
+	 * Returns the maximum back pressure ratio of all sub tasks.
+	 *
+	 * @return Maximum back pressure ratio of all sub tasks.
+	 */
+	public double getMaxBackPressureRatio() {
+		return maxSubTaskBackPressureRatio;
+	}
+
+	@Override
+	public String toString() {
+		return "OperatorBackPressureStats{" +
+				"sampleId=" + sampleId +
+				", endTimestamp=" + endTimestamp +
+				", subTaskBackPressureRatio=" + Arrays.toString(subTaskBackPressureRatio) +
+				'}';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSample.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSample.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSample.java
new file mode 100644
index 0000000..dda4e33
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSample.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.backpressure;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A sample of stack traces for one or more tasks.
+ *
+ * <p>The sampling is triggered in {@link StackTraceSampleCoordinator}.
+ */
+public class StackTraceSample {
+
+	/** ID of this sample (unique per job). */
+	private final int sampleId;
+
+	/** Time stamp, when the sample was triggered. */
+	private final long startTime;
+
+	/** Time stamp, when all stack traces were collected at the JobManager. */
+	private final long endTime;
+
+	/** Map of stack traces by execution ID. */
+	private final Map<ExecutionAttemptID, List<StackTraceElement[]>> stackTracesByTask;
+
+	/**
+	 * Creates a stack trace sample.
+	 *
+	 * @param sampleId          ID of the sample.
+	 * @param startTime         Time stamp, when the sample was triggered.
+	 * @param endTime           Time stamp, when all stack traces were
+	 *                          collected at the JobManager.
+	 * @param stackTracesByTask Map of stack traces by execution ID.
+	 */
+	public StackTraceSample(
+			int sampleId,
+			long startTime,
+			long endTime,
+			Map<ExecutionAttemptID, List<StackTraceElement[]>> stackTracesByTask) {
+
+		checkArgument(sampleId >= 0, "Negative sample ID");
+		checkArgument(startTime >= 0, "Negative start time");
+		checkArgument(endTime >= startTime, "End time before start time");
+
+		this.sampleId = sampleId;
+		this.startTime = startTime;
+		this.endTime = endTime;
+		this.stackTracesByTask = Collections.unmodifiableMap(stackTracesByTask);
+	}
+
+	/**
+	 * Returns the ID of the sample.
+	 *
+	 * @return ID of the sample
+	 */
+	public int getSampleId() {
+		return sampleId;
+	}
+
+	/**
+	 * Returns the time stamp, when the sample was triggered.
+	 *
+	 * @return Time stamp, when the sample was triggered
+	 */
+	public long getStartTime() {
+		return startTime;
+	}
+
+	/**
+	 * Returns the time stamp, when all stack traces were collected at the
+	 * JobManager.
+	 *
+	 * @return Time stamp, when all stack traces were collected at the
+	 * JobManager
+	 */
+	public long getEndTime() {
+		return endTime;
+	}
+
+	/**
+	 * Returns the a map of stack traces by execution ID.
+	 *
+	 * @return Map of stack traces by execution ID
+	 */
+	public Map<ExecutionAttemptID, List<StackTraceElement[]>> getStackTraces() {
+		return stackTracesByTask;
+	}
+
+	@Override
+	public String toString() {
+		return "StackTraceSample{" +
+				"sampleId=" + sampleId +
+				", startTime=" + startTime +
+				", endTime=" + endTime +
+				'}';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinator.java
new file mode 100644
index 0000000..8c2ec6e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinator.java
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.backpressure;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.messages.StackTraceSampleResponse;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A coordinator for triggering and collecting stack traces of running tasks.
+ */
+public class StackTraceSampleCoordinator {
+
+	private static final Logger LOG = LoggerFactory.getLogger(StackTraceSampleCoordinator.class);
+
+	private static final int NUM_GHOST_SAMPLE_IDS = 10;
+
+	private final Object lock = new Object();
+
+	/** Executor used to run the futures. */
+	private final Executor executor;
+
+	/** Time out after the expected sampling duration. */
+	private final long sampleTimeout;
+
+	/** In progress samples (guarded by lock). */
+	private final Map<Integer, PendingStackTraceSample> pendingSamples = new HashMap<>();
+
+	/** A list of recent sample IDs to identify late messages vs. invalid ones. */
+	private final ArrayDeque<Integer> recentPendingSamples = new ArrayDeque<>(NUM_GHOST_SAMPLE_IDS);
+
+	/** Sample ID counter (guarded by lock). */
+	private int sampleIdCounter;
+
+	/**
+	 * Flag indicating whether the coordinator is still running (guarded by
+	 * lock).
+	 */
+	private boolean isShutDown;
+
+	/**
+	 * Creates a new coordinator for the job.
+	 *
+	 * @param executor to use to execute the futures
+	 * @param sampleTimeout Time out after the expected sampling duration.
+	 *                      This is added to the expected duration of a
+	 *                      sample, which is determined by the number of
+	 *                      samples and the delay between each sample.
+	 */
+	public StackTraceSampleCoordinator(Executor executor, long sampleTimeout) {
+		checkArgument(sampleTimeout >= 0L);
+		this.executor = Preconditions.checkNotNull(executor);
+		this.sampleTimeout = sampleTimeout;
+	}
+
+	/**
+	 * Triggers a stack trace sample to all tasks.
+	 *
+	 * @param tasksToSample       Tasks to sample.
+	 * @param numSamples          Number of stack trace samples to collect.
+	 * @param delayBetweenSamples Delay between consecutive samples.
+	 * @param maxStackTraceDepth  Maximum depth of the stack trace. 0 indicates
+	 *                            no maximum and keeps the complete stack trace.
+	 * @return A future of the completed stack trace sample
+	 */
+	@SuppressWarnings("unchecked")
+	public CompletableFuture<StackTraceSample> triggerStackTraceSample(
+			ExecutionVertex[] tasksToSample,
+			int numSamples,
+			Time delayBetweenSamples,
+			int maxStackTraceDepth) {
+
+		checkNotNull(tasksToSample, "Tasks to sample");
+		checkArgument(tasksToSample.length >= 1, "No tasks to sample");
+		checkArgument(numSamples >= 1, "No number of samples");
+		checkArgument(maxStackTraceDepth >= 0, "Negative maximum stack trace depth");
+
+		// Execution IDs of running tasks
+		ExecutionAttemptID[] triggerIds = new ExecutionAttemptID[tasksToSample.length];
+		Execution[] executions = new Execution[tasksToSample.length];
+
+		// Check that all tasks are RUNNING before triggering anything. The
+		// triggering can still fail.
+		for (int i = 0; i < triggerIds.length; i++) {
+			Execution execution = tasksToSample[i].getCurrentExecutionAttempt();
+			if (execution != null && execution.getState() == ExecutionState.RUNNING) {
+				executions[i] = execution;
+				triggerIds[i] = execution.getAttemptId();
+			} else {
+				return FutureUtils.completedExceptionally(new IllegalStateException("Task " + tasksToSample[i]
+					.getTaskNameWithSubtaskIndex() + " is not running."));
+			}
+		}
+
+		synchronized (lock) {
+			if (isShutDown) {
+				return FutureUtils.completedExceptionally(new IllegalStateException("Shut down"));
+			}
+
+			final int sampleId = sampleIdCounter++;
+
+			LOG.debug("Triggering stack trace sample {}", sampleId);
+
+			final PendingStackTraceSample pending = new PendingStackTraceSample(
+					sampleId, triggerIds);
+
+			// Discard the sample if it takes too long. We don't send cancel
+			// messages to the task managers, but only wait for the responses
+			// and then ignore them.
+			long expectedDuration = numSamples * delayBetweenSamples.toMilliseconds();
+			Time timeout = Time.milliseconds(expectedDuration + sampleTimeout);
+
+			// Add the pending sample before scheduling the discard task to
+			// prevent races with removing it again.
+			pendingSamples.put(sampleId, pending);
+
+			// Trigger all samples
+			for (Execution execution: executions) {
+				final CompletableFuture<StackTraceSampleResponse> stackTraceSampleFuture = execution.requestStackTraceSample(
+					sampleId,
+					numSamples,
+					delayBetweenSamples,
+					maxStackTraceDepth,
+					timeout);
+
+				stackTraceSampleFuture.handleAsync(
+					(StackTraceSampleResponse stackTraceSampleResponse, Throwable throwable) -> {
+						if (stackTraceSampleResponse != null) {
+							collectStackTraces(
+								stackTraceSampleResponse.getSampleId(),
+								stackTraceSampleResponse.getExecutionAttemptID(),
+								stackTraceSampleResponse.getSamples());
+						} else {
+							cancelStackTraceSample(sampleId, throwable);
+						}
+
+						return null;
+					},
+					executor);
+			}
+
+			return pending.getStackTraceSampleFuture();
+		}
+	}
+
+	/**
+	 * Cancels a pending sample.
+	 *
+	 * @param sampleId ID of the sample to cancel.
+	 * @param cause Cause of the cancelling (can be <code>null</code>).
+	 */
+	public void cancelStackTraceSample(int sampleId, Throwable cause) {
+		synchronized (lock) {
+			if (isShutDown) {
+				return;
+			}
+
+			PendingStackTraceSample sample = pendingSamples.remove(sampleId);
+			if (sample != null) {
+				if (cause != null) {
+					LOG.info("Cancelling sample " + sampleId, cause);
+				} else {
+					LOG.info("Cancelling sample {}", sampleId);
+				}
+
+				sample.discard(cause);
+				rememberRecentSampleId(sampleId);
+			}
+		}
+	}
+
+	/**
+	 * Shuts down the coordinator.
+	 *
+	 * <p>After shut down, no further operations are executed.
+	 */
+	public void shutDown() {
+		synchronized (lock) {
+			if (!isShutDown) {
+				LOG.info("Shutting down stack trace sample coordinator.");
+
+				for (PendingStackTraceSample pending : pendingSamples.values()) {
+					pending.discard(new RuntimeException("Shut down"));
+				}
+
+				pendingSamples.clear();
+
+				isShutDown = true;
+			}
+		}
+	}
+
+	/**
+	 * Collects stack traces of a task.
+	 *
+	 * @param sampleId    ID of the sample.
+	 * @param executionId ID of the sampled task.
+	 * @param stackTraces Stack traces of the sampled task.
+	 *
+	 * @throws IllegalStateException If unknown sample ID and not recently
+	 *                               finished or cancelled sample.
+	 */
+	public void collectStackTraces(
+			int sampleId,
+			ExecutionAttemptID executionId,
+			List<StackTraceElement[]> stackTraces) {
+
+		synchronized (lock) {
+			if (isShutDown) {
+				return;
+			}
+
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Collecting stack trace sample {} of task {}", sampleId, executionId);
+			}
+
+			PendingStackTraceSample pending = pendingSamples.get(sampleId);
+
+			if (pending != null) {
+				pending.collectStackTraces(executionId, stackTraces);
+
+				// Publish the sample
+				if (pending.isComplete()) {
+					pendingSamples.remove(sampleId);
+					rememberRecentSampleId(sampleId);
+
+					pending.completePromiseAndDiscard();
+				}
+			} else if (recentPendingSamples.contains(sampleId)) {
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("Received late stack trace sample {} of task {}",
+							sampleId, executionId);
+				}
+			} else {
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("Unknown sample ID " + sampleId);
+				}
+			}
+		}
+	}
+
+	private void rememberRecentSampleId(int sampleId) {
+		if (recentPendingSamples.size() >= NUM_GHOST_SAMPLE_IDS) {
+			recentPendingSamples.removeFirst();
+		}
+		recentPendingSamples.addLast(sampleId);
+	}
+
+	int getNumberOfPendingSamples() {
+		synchronized (lock) {
+			return pendingSamples.size();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * A pending stack trace sample, which collects stack traces and owns a
+	 * {@link StackTraceSample} promise.
+	 *
+	 * <p>Access pending sample in lock scope.
+	 */
+	private static class PendingStackTraceSample {
+
+		private final int sampleId;
+		private final long startTime;
+		private final Set<ExecutionAttemptID> pendingTasks;
+		private final Map<ExecutionAttemptID, List<StackTraceElement[]>> stackTracesByTask;
+		private final CompletableFuture<StackTraceSample> stackTraceFuture;
+
+		private boolean isDiscarded;
+
+		PendingStackTraceSample(
+				int sampleId,
+				ExecutionAttemptID[] tasksToCollect) {
+
+			this.sampleId = sampleId;
+			this.startTime = System.currentTimeMillis();
+			this.pendingTasks = new HashSet<>(Arrays.asList(tasksToCollect));
+			this.stackTracesByTask = Maps.newHashMapWithExpectedSize(tasksToCollect.length);
+			this.stackTraceFuture = new CompletableFuture<>();
+		}
+
+		int getSampleId() {
+			return sampleId;
+		}
+
+		long getStartTime() {
+			return startTime;
+		}
+
+		boolean isDiscarded() {
+			return isDiscarded;
+		}
+
+		boolean isComplete() {
+			if (isDiscarded) {
+				throw new IllegalStateException("Discarded");
+			}
+
+			return pendingTasks.isEmpty();
+		}
+
+		void discard(Throwable cause) {
+			if (!isDiscarded) {
+				pendingTasks.clear();
+				stackTracesByTask.clear();
+
+				stackTraceFuture.completeExceptionally(new RuntimeException("Discarded", cause));
+
+				isDiscarded = true;
+			}
+		}
+
+		void collectStackTraces(ExecutionAttemptID executionId, List<StackTraceElement[]> stackTraces) {
+			if (isDiscarded) {
+				throw new IllegalStateException("Discarded");
+			}
+
+			if (pendingTasks.remove(executionId)) {
+				stackTracesByTask.put(executionId, Collections.unmodifiableList(stackTraces));
+			} else if (isComplete()) {
+				throw new IllegalStateException("Completed");
+			} else {
+				throw new IllegalArgumentException("Unknown task " + executionId);
+			}
+		}
+
+		void completePromiseAndDiscard() {
+			if (isComplete()) {
+				isDiscarded = true;
+
+				long endTime = System.currentTimeMillis();
+
+				StackTraceSample stackTraceSample = new StackTraceSample(
+						sampleId,
+						startTime,
+						endTime,
+						stackTracesByTask);
+
+				stackTraceFuture.complete(stackTraceSample);
+			} else {
+				throw new IllegalStateException("Not completed yet");
+			}
+		}
+
+		@SuppressWarnings("unchecked")
+		CompletableFuture<StackTraceSample> getStackTraceSampleFuture() {
+			return stackTraceFuture;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java
new file mode 100644
index 0000000..2086628
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.checkpoints;
+
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder;
+import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Handler that returns a job's snapshotting settings.
+ */
+public class CheckpointConfigHandler extends AbstractExecutionGraphRequestHandler {
+
+	private static final String CHECKPOINT_CONFIG_REST_PATH = "/jobs/:jobid/checkpoints/config";
+
+	public CheckpointConfigHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+		super(executionGraphHolder, executor);
+	}
+
+	@Override
+	public String[] getPaths() {
+		return new String[]{CHECKPOINT_CONFIG_REST_PATH};
+	}
+
+	@Override
+	public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
+		return CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					return createCheckpointConfigJson(graph);
+				} catch (IOException e) {
+					throw new FlinkFutureException("Could not create checkpoint config json.", e);
+				}
+			},
+			executor);
+	}
+
+	/**
+	 * Archivist for the CheckpointConfigHandler.
+	 */
+	public static class CheckpointConfigJsonArchivist implements JsonArchivist {
+
+		@Override
+		public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+			String json = createCheckpointConfigJson(graph);
+			String path = CHECKPOINT_CONFIG_REST_PATH
+				.replace(":jobid", graph.getJobID().toString());
+			return Collections.singletonList(new ArchivedJson(path, json));
+		}
+	}
+
+	private static String createCheckpointConfigJson(AccessExecutionGraph graph) throws IOException {
+		StringWriter writer = new StringWriter();
+		JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+		JobCheckpointingSettings settings = graph.getJobCheckpointingSettings();
+
+		if (settings == null) {
+			return "{}";
+		}
+
+		gen.writeStartObject();
+		{
+			gen.writeStringField("mode", settings.isExactlyOnce() ? "exactly_once" : "at_least_once");
+			gen.writeNumberField("interval", settings.getCheckpointInterval());
+			gen.writeNumberField("timeout", settings.getCheckpointTimeout());
+			gen.writeNumberField("min_pause", settings.getMinPauseBetweenCheckpoints());
+			gen.writeNumberField("max_concurrent", settings.getMaxConcurrentCheckpoints());
+
+			ExternalizedCheckpointSettings externalization = settings.getExternalizedCheckpointSettings();
+			gen.writeObjectFieldStart("externalization");
+			{
+				if (externalization.externalizeCheckpoints()) {
+					gen.writeBooleanField("enabled", true);
+					gen.writeBooleanField("delete_on_cancellation", externalization.deleteOnCancellation());
+				} else {
+					gen.writeBooleanField("enabled", false);
+				}
+			}
+			gen.writeEndObject();
+
+		}
+		gen.writeEndObject();
+
+		gen.close();
+
+		return writer.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCache.java
new file mode 100644
index 0000000..f21fc76
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCache.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.checkpoints;
+
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import javax.annotation.Nullable;
+
+/**
+ * A size-based cache of accessed checkpoints for completed and failed
+ * checkpoints.
+ *
+ * <p>Having this cache in place for accessed stats improves the user
+ * experience quite a bit as accessed checkpoint stats stay available
+ * and don't expire. For example if you manage to click on the last
+ * checkpoint in the history, it is not available via the stats as soon
+ * as another checkpoint is triggered. With the cache in place, the
+ * checkpoint will still be available for investigation.
+ */
+public class CheckpointStatsCache {
+
+	@Nullable
+	private final Cache<Long, AbstractCheckpointStats> cache;
+
+	public CheckpointStatsCache(int maxNumEntries) {
+		if (maxNumEntries > 0) {
+			this.cache = CacheBuilder.<Long, AbstractCheckpointStats>newBuilder()
+				.maximumSize(maxNumEntries)
+				.build();
+		} else {
+			this.cache = null;
+		}
+	}
+
+	/**
+	 * Try to add the checkpoint to the cache.
+	 *
+	 * @param checkpoint Checkpoint to be added.
+	 */
+	void tryAdd(AbstractCheckpointStats checkpoint) {
+		// Don't add in progress checkpoints as they will be replaced by their
+		// completed/failed version eventually.
+		if (cache != null && checkpoint != null && !checkpoint.getStatus().isInProgress()) {
+			cache.put(checkpoint.getCheckpointId(), checkpoint);
+		}
+	}
+
+	/**
+	 * Try to look up a checkpoint by it's ID in the cache.
+	 *
+	 * @param checkpointId ID of the checkpoint to look up.
+	 * @return The checkpoint or <code>null</code> if checkpoint not found.
+	 */
+	AbstractCheckpointStats tryGet(long checkpointId) {
+		if (cache != null) {
+			return cache.getIfPresent(checkpointId);
+		} else {
+			return null;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java
new file mode 100644
index 0000000..61ebeda
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.checkpoints;
+
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.TaskStateStats;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder;
+import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler that returns checkpoint stats for a single job vertex.
+ */
+public class CheckpointStatsDetailsHandler extends AbstractExecutionGraphRequestHandler {
+
+	private static final String CHECKPOINT_STATS_DETAILS_REST_PATH = "/jobs/:jobid/checkpoints/details/:checkpointid";
+
+	private final CheckpointStatsCache cache;
+
+	public CheckpointStatsDetailsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, CheckpointStatsCache cache) {
+		super(executionGraphHolder, executor);
+		this.cache = cache;
+	}
+
+	@Override
+	public String[] getPaths() {
+		return new String[]{CHECKPOINT_STATS_DETAILS_REST_PATH};
+	}
+
+	@Override
+	public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
+		return CompletableFuture.supplyAsync(
+			() -> {
+				long checkpointId = parseCheckpointId(params);
+				if (checkpointId == -1) {
+					return "{}";
+				}
+
+				CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot();
+				if (snapshot == null) {
+					return "{}";
+				}
+
+				AbstractCheckpointStats checkpoint = snapshot.getHistory().getCheckpointById(checkpointId);
+
+				if (checkpoint != null) {
+					cache.tryAdd(checkpoint);
+				} else {
+					checkpoint = cache.tryGet(checkpointId);
+
+					if (checkpoint == null) {
+						return "{}";
+					}
+				}
+
+				try {
+					return createCheckpointDetailsJson(checkpoint);
+				} catch (IOException e) {
+					throw new FlinkFutureException("Could not create checkpoint details json.", e);
+				}
+			},
+			executor);
+	}
+
+	/**
+	 * Archivist for the CheckpointStatsDetails.
+	 */
+	public static class CheckpointStatsDetailsJsonArchivist implements JsonArchivist {
+
+		@Override
+		public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+			CheckpointStatsSnapshot stats = graph.getCheckpointStatsSnapshot();
+			if (stats == null) {
+				return Collections.emptyList();
+			}
+			CheckpointStatsHistory history = stats.getHistory();
+			List<ArchivedJson> archive = new ArrayList<>();
+			for (AbstractCheckpointStats checkpoint : history.getCheckpoints()) {
+				String json = createCheckpointDetailsJson(checkpoint);
+				String path = CHECKPOINT_STATS_DETAILS_REST_PATH
+					.replace(":jobid", graph.getJobID().toString())
+					.replace(":checkpointid", String.valueOf(checkpoint.getCheckpointId()));
+				archive.add(new ArchivedJson(path, json));
+			}
+			return archive;
+		}
+	}
+
+	public static String createCheckpointDetailsJson(AbstractCheckpointStats checkpoint) throws IOException {
+		StringWriter writer = new StringWriter();
+		JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+		gen.writeStartObject();
+
+		gen.writeNumberField("id", checkpoint.getCheckpointId());
+		gen.writeStringField("status", checkpoint.getStatus().toString());
+		gen.writeBooleanField("is_savepoint", checkpoint.getProperties().isSavepoint());
+		gen.writeNumberField("trigger_timestamp", checkpoint.getTriggerTimestamp());
+		gen.writeNumberField("latest_ack_timestamp", checkpoint.getLatestAckTimestamp());
+		gen.writeNumberField("state_size", checkpoint.getStateSize());
+		gen.writeNumberField("end_to_end_duration", checkpoint.getEndToEndDuration());
+		gen.writeNumberField("alignment_buffered", checkpoint.getAlignmentBuffered());
+		gen.writeNumberField("num_subtasks", checkpoint.getNumberOfSubtasks());
+		gen.writeNumberField("num_acknowledged_subtasks", checkpoint.getNumberOfAcknowledgedSubtasks());
+
+		if (checkpoint.getStatus().isCompleted()) {
+			// --- Completed ---
+			CompletedCheckpointStats completed = (CompletedCheckpointStats) checkpoint;
+
+			String externalPath = completed.getExternalPath();
+			if (externalPath != null) {
+				gen.writeStringField("external_path", externalPath);
+			}
+
+			gen.writeBooleanField("discarded", completed.isDiscarded());
+		}
+		else if (checkpoint.getStatus().isFailed()) {
+			// --- Failed ---
+			FailedCheckpointStats failed = (FailedCheckpointStats) checkpoint;
+
+			gen.writeNumberField("failure_timestamp", failed.getFailureTimestamp());
+
+			String failureMsg = failed.getFailureMessage();
+			if (failureMsg != null) {
+				gen.writeStringField("failure_message", failureMsg);
+			}
+		}
+
+		gen.writeObjectFieldStart("tasks");
+		for (TaskStateStats taskStats : checkpoint.getAllTaskStateStats()) {
+			gen.writeObjectFieldStart(taskStats.getJobVertexId().toString());
+
+			gen.writeNumberField("latest_ack_timestamp", taskStats.getLatestAckTimestamp());
+			gen.writeNumberField("state_size", taskStats.getStateSize());
+			gen.writeNumberField("end_to_end_duration", taskStats.getEndToEndDuration(checkpoint.getTriggerTimestamp()));
+			gen.writeNumberField("alignment_buffered", taskStats.getAlignmentBuffered());
+			gen.writeNumberField("num_subtasks", taskStats.getNumberOfSubtasks());
+			gen.writeNumberField("num_acknowledged_subtasks", taskStats.getNumberOfAcknowledgedSubtasks());
+
+			gen.writeEndObject();
+		}
+		gen.writeEndObject();
+
+		gen.writeEndObject();
+		gen.close();
+
+		return writer.toString();
+	}
+
+	/**
+	 * Returns the checkpoint ID parsed from the provided parameters.
+	 *
+	 * @param params Path parameters
+	 * @return Parsed checkpoint ID or <code>-1</code> if not available.
+	 */
+	static long parseCheckpointId(Map<String, String> params) {
+		String param = params.get("checkpointid");
+		if (param == null) {
+			return -1;
+		}
+
+		try {
+			return Long.parseLong(param);
+		} catch (NumberFormatException ignored) {
+			return -1;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsSubtasksHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
new file mode 100644
index 0000000..22a8db2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.checkpoints;
+
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
+import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
+import org.apache.flink.runtime.checkpoint.SubtaskStateStats;
+import org.apache.flink.runtime.checkpoint.TaskStateStats;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler;
+import org.apache.flink.runtime.rest.handler.legacy.AbstractJobVertexRequestHandler;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder;
+import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Request handler that returns checkpoint stats for a single job vertex with
+ * the summary stats and all subtasks.
+ */
+public class CheckpointStatsDetailsSubtasksHandler extends AbstractExecutionGraphRequestHandler {
+
+	private static final String CHECKPOINT_STATS_DETAILS_SUBTASKS_REST_PATH = "/jobs/:jobid/checkpoints/details/:checkpointid/subtasks/:vertexid";
+
+	private final CheckpointStatsCache cache;
+
+	public CheckpointStatsDetailsSubtasksHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, CheckpointStatsCache cache) {
+		super(executionGraphHolder, executor);
+		this.cache = checkNotNull(cache);
+	}
+
+	@Override
+	public String[] getPaths() {
+		return new String[]{CHECKPOINT_STATS_DETAILS_SUBTASKS_REST_PATH};
+	}
+
+	@Override
+	public CompletableFuture<String> handleJsonRequest(
+			Map<String, String> pathParams,
+			Map<String, String> queryParams,
+			JobManagerGateway jobManagerGateway) {
+		return super.handleJsonRequest(pathParams, queryParams, jobManagerGateway);
+	}
+
+	@Override
+	public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
+		long checkpointId = CheckpointStatsDetailsHandler.parseCheckpointId(params);
+		if (checkpointId == -1) {
+			return CompletableFuture.completedFuture("{}");
+		}
+
+		JobVertexID vertexId = AbstractJobVertexRequestHandler.parseJobVertexId(params);
+		if (vertexId == null) {
+			return CompletableFuture.completedFuture("{}");
+		}
+
+		CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot();
+		if (snapshot == null) {
+			return CompletableFuture.completedFuture("{}");
+		}
+
+		AbstractCheckpointStats checkpoint = snapshot.getHistory().getCheckpointById(checkpointId);
+
+		if (checkpoint != null) {
+			cache.tryAdd(checkpoint);
+		} else {
+			checkpoint = cache.tryGet(checkpointId);
+
+			if (checkpoint == null) {
+				return CompletableFuture.completedFuture("{}");
+			}
+		}
+
+		TaskStateStats taskStats = checkpoint.getTaskStateStats(vertexId);
+		if (taskStats == null) {
+			return CompletableFuture.completedFuture("{}");
+		}
+
+		try {
+			return CompletableFuture.completedFuture(createSubtaskCheckpointDetailsJson(checkpoint, taskStats));
+		} catch (IOException e) {
+			return FutureUtils.completedExceptionally(e);
+		}
+	}
+
+	/**
+	 * Archivist for the CheckpointStatsDetailsSubtasksHandler.
+	 */
+	public static class CheckpointStatsDetailsSubtasksJsonArchivist implements JsonArchivist {
+
+		@Override
+		public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+			CheckpointStatsSnapshot stats = graph.getCheckpointStatsSnapshot();
+			if (stats == null) {
+				return Collections.emptyList();
+			}
+			CheckpointStatsHistory history = stats.getHistory();
+			List<ArchivedJson> archive = new ArrayList<>();
+			for (AbstractCheckpointStats checkpoint : history.getCheckpoints()) {
+				for (TaskStateStats subtaskStats : checkpoint.getAllTaskStateStats()) {
+					String json = createSubtaskCheckpointDetailsJson(checkpoint, subtaskStats);
+					String path = CHECKPOINT_STATS_DETAILS_SUBTASKS_REST_PATH
+						.replace(":jobid", graph.getJobID().toString())
+						.replace(":checkpointid", String.valueOf(checkpoint.getCheckpointId()))
+						.replace(":vertexid", subtaskStats.getJobVertexId().toString());
+					archive.add(new ArchivedJson(path, json));
+				}
+			}
+			return archive;
+		}
+	}
+
+	private static String createSubtaskCheckpointDetailsJson(AbstractCheckpointStats checkpoint, TaskStateStats taskStats) throws IOException {
+		StringWriter writer = new StringWriter();
+		JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+
+		gen.writeStartObject();
+		// Overview
+		gen.writeNumberField("id", checkpoint.getCheckpointId());
+		gen.writeStringField("status", checkpoint.getStatus().toString());
+		gen.writeNumberField("latest_ack_timestamp", taskStats.getLatestAckTimestamp());
+		gen.writeNumberField("state_size", taskStats.getStateSize());
+		gen.writeNumberField("end_to_end_duration", taskStats.getEndToEndDuration(checkpoint.getTriggerTimestamp()));
+		gen.writeNumberField("alignment_buffered", taskStats.getAlignmentBuffered());
+		gen.writeNumberField("num_subtasks", taskStats.getNumberOfSubtasks());
+		gen.writeNumberField("num_acknowledged_subtasks", taskStats.getNumberOfAcknowledgedSubtasks());
+
+		if (taskStats.getNumberOfAcknowledgedSubtasks() > 0) {
+			gen.writeObjectFieldStart("summary");
+			gen.writeObjectFieldStart("state_size");
+			CheckpointStatsHandler.writeMinMaxAvg(gen, taskStats.getSummaryStats().getStateSizeStats());
+			gen.writeEndObject();
+
+			gen.writeObjectFieldStart("end_to_end_duration");
+			MinMaxAvgStats ackTimestampStats = taskStats.getSummaryStats().getAckTimestampStats();
+			gen.writeNumberField("min", Math.max(0, ackTimestampStats.getMinimum() - checkpoint.getTriggerTimestamp()));
+			gen.writeNumberField("max", Math.max(0, ackTimestampStats.getMaximum() - checkpoint.getTriggerTimestamp()));
+			gen.writeNumberField("avg", Math.max(0, ackTimestampStats.getAverage() - checkpoint.getTriggerTimestamp()));
+			gen.writeEndObject();
+
+			gen.writeObjectFieldStart("checkpoint_duration");
+			gen.writeObjectFieldStart("sync");
+			CheckpointStatsHandler.writeMinMaxAvg(gen, taskStats.getSummaryStats().getSyncCheckpointDurationStats());
+			gen.writeEndObject();
+			gen.writeObjectFieldStart("async");
+			CheckpointStatsHandler.writeMinMaxAvg(gen, taskStats.getSummaryStats().getAsyncCheckpointDurationStats());
+			gen.writeEndObject();
+			gen.writeEndObject();
+
+			gen.writeObjectFieldStart("alignment");
+			gen.writeObjectFieldStart("buffered");
+			CheckpointStatsHandler.writeMinMaxAvg(gen, taskStats.getSummaryStats().getAlignmentBufferedStats());
+			gen.writeEndObject();
+			gen.writeObjectFieldStart("duration");
+			CheckpointStatsHandler.writeMinMaxAvg(gen, taskStats.getSummaryStats().getAlignmentDurationStats());
+			gen.writeEndObject();
+			gen.writeEndObject();
+			gen.writeEndObject();
+		}
+
+		SubtaskStateStats[] subtasks = taskStats.getSubtaskStats();
+
+		gen.writeArrayFieldStart("subtasks");
+		for (int i = 0; i < subtasks.length; i++) {
+			SubtaskStateStats subtask = subtasks[i];
+
+			gen.writeStartObject();
+			gen.writeNumberField("index", i);
+
+			if (subtask != null) {
+				gen.writeStringField("status", "completed");
+				gen.writeNumberField("ack_timestamp", subtask.getAckTimestamp());
+				gen.writeNumberField("end_to_end_duration", subtask.getEndToEndDuration(checkpoint.getTriggerTimestamp()));
+				gen.writeNumberField("state_size", subtask.getStateSize());
+
+				gen.writeObjectFieldStart("checkpoint");
+				gen.writeNumberField("sync", subtask.getSyncCheckpointDuration());
+				gen.writeNumberField("async", subtask.getAsyncCheckpointDuration());
+				gen.writeEndObject();
+
+				gen.writeObjectFieldStart("alignment");
+				gen.writeNumberField("buffered", subtask.getAlignmentBuffered());
+				gen.writeNumberField("duration", subtask.getAlignmentDuration());
+				gen.writeEndObject();
+			} else {
+				gen.writeStringField("status", "pending_or_failed");
+			}
+			gen.writeEndObject();
+		}
+		gen.writeEndArray();
+
+		gen.writeEndObject();
+		gen.close();
+
+		return writer.toString();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java
new file mode 100644
index 0000000..abb353e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.checkpoints;
+
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsCounts;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStatsSummary;
+import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
+import org.apache.flink.runtime.checkpoint.RestoredCheckpointStats;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder;
+import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Handler that returns checkpoint statistics for a job.
+ */
+public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler {
+
+	private static final String CHECKPOINT_STATS_REST_PATH = "/jobs/:jobid/checkpoints";
+
+	public CheckpointStatsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+		super(executionGraphHolder, executor);
+	}
+
+	@Override
+	public String[] getPaths() {
+		return new String[]{CHECKPOINT_STATS_REST_PATH};
+	}
+
+	@Override
+	public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
+		return CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					return createCheckpointStatsJson(graph);
+				} catch (IOException e) {
+					throw new FlinkFutureException("Could not create checkpoint stats json.", e);
+				}
+			},
+			executor);
+	}
+
+	/**
+	 * Archivist for the CheckpointStatsJsonHandler.
+	 */
+	public static class CheckpointStatsJsonArchivist implements JsonArchivist {
+
+		@Override
+		public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+			String json = createCheckpointStatsJson(graph);
+			String path = CHECKPOINT_STATS_REST_PATH
+				.replace(":jobid", graph.getJobID().toString());
+			return Collections.singletonList(new ArchivedJson(path, json));
+		}
+	}
+
+	private static String createCheckpointStatsJson(AccessExecutionGraph graph) throws IOException {
+		StringWriter writer = new StringWriter();
+		JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+
+		CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot();
+		if (snapshot == null) {
+			return "{}";
+		}
+
+		gen.writeStartObject();
+
+		// Counts
+		writeCounts(gen, snapshot.getCounts());
+
+		// Summary
+		writeSummary(gen, snapshot.getSummaryStats());
+
+		CheckpointStatsHistory history = snapshot.getHistory();
+
+		// Latest
+		writeLatestCheckpoints(
+			gen,
+			history.getLatestCompletedCheckpoint(),
+			history.getLatestSavepoint(),
+			history.getLatestFailedCheckpoint(),
+			snapshot.getLatestRestoredCheckpoint());
+
+		// History
+		writeHistory(gen, snapshot.getHistory());
+
+		gen.writeEndObject();
+		gen.close();
+
+		return writer.toString();
+	}
+
+	private static void writeCounts(JsonGenerator gen, CheckpointStatsCounts counts) throws IOException {
+		gen.writeObjectFieldStart("counts");
+		gen.writeNumberField("restored", counts.getNumberOfRestoredCheckpoints());
+		gen.writeNumberField("total", counts.getTotalNumberOfCheckpoints());
+		gen.writeNumberField("in_progress", counts.getNumberOfInProgressCheckpoints());
+		gen.writeNumberField("completed", counts.getNumberOfCompletedCheckpoints());
+		gen.writeNumberField("failed", counts.getNumberOfFailedCheckpoints());
+		gen.writeEndObject();
+	}
+
+	private static void writeSummary(
+		JsonGenerator gen,
+		CompletedCheckpointStatsSummary summary) throws IOException {
+		gen.writeObjectFieldStart("summary");
+		gen.writeObjectFieldStart("state_size");
+		writeMinMaxAvg(gen, summary.getStateSizeStats());
+		gen.writeEndObject();
+
+		gen.writeObjectFieldStart("end_to_end_duration");
+		writeMinMaxAvg(gen, summary.getEndToEndDurationStats());
+		gen.writeEndObject();
+
+		gen.writeObjectFieldStart("alignment_buffered");
+		writeMinMaxAvg(gen, summary.getAlignmentBufferedStats());
+		gen.writeEndObject();
+		gen.writeEndObject();
+	}
+
+	static void writeMinMaxAvg(JsonGenerator gen, MinMaxAvgStats minMaxAvg) throws IOException {
+		gen.writeNumberField("min", minMaxAvg.getMinimum());
+		gen.writeNumberField("max", minMaxAvg.getMaximum());
+		gen.writeNumberField("avg", minMaxAvg.getAverage());
+	}
+
+	private static void writeLatestCheckpoints(
+		JsonGenerator gen,
+		@Nullable CompletedCheckpointStats completed,
+		@Nullable CompletedCheckpointStats savepoint,
+		@Nullable FailedCheckpointStats failed,
+		@Nullable RestoredCheckpointStats restored) throws IOException {
+
+		gen.writeObjectFieldStart("latest");
+		// Completed checkpoint
+		if (completed != null) {
+			gen.writeObjectFieldStart("completed");
+			writeCheckpoint(gen, completed);
+
+			String externalPath = completed.getExternalPath();
+			if (externalPath != null) {
+				gen.writeStringField("external_path", completed.getExternalPath());
+			}
+
+			gen.writeEndObject();
+		}
+
+		// Completed savepoint
+		if (savepoint != null) {
+			gen.writeObjectFieldStart("savepoint");
+			writeCheckpoint(gen, savepoint);
+
+			String externalPath = savepoint.getExternalPath();
+			if (externalPath != null) {
+				gen.writeStringField("external_path", savepoint.getExternalPath());
+			}
+			gen.writeEndObject();
+		}
+
+		// Failed checkpoint
+		if (failed != null) {
+			gen.writeObjectFieldStart("failed");
+			writeCheckpoint(gen, failed);
+
+			gen.writeNumberField("failure_timestamp", failed.getFailureTimestamp());
+			String failureMsg = failed.getFailureMessage();
+			if (failureMsg != null) {
+				gen.writeStringField("failure_message", failureMsg);
+			}
+			gen.writeEndObject();
+		}
+
+		// Restored checkpoint
+		if (restored != null) {
+			gen.writeObjectFieldStart("restored");
+			gen.writeNumberField("id", restored.getCheckpointId());
+			gen.writeNumberField("restore_timestamp", restored.getRestoreTimestamp());
+			gen.writeBooleanField("is_savepoint", restored.getProperties().isSavepoint());
+
+			String externalPath = restored.getExternalPath();
+			if (externalPath != null) {
+				gen.writeStringField("external_path", externalPath);
+			}
+			gen.writeEndObject();
+		}
+		gen.writeEndObject();
+	}
+
+	private static void writeCheckpoint(JsonGenerator gen, AbstractCheckpointStats checkpoint) throws IOException {
+		gen.writeNumberField("id", checkpoint.getCheckpointId());
+		gen.writeNumberField("trigger_timestamp", checkpoint.getTriggerTimestamp());
+		gen.writeNumberField("latest_ack_timestamp", checkpoint.getLatestAckTimestamp());
+		gen.writeNumberField("state_size", checkpoint.getStateSize());
+		gen.writeNumberField("end_to_end_duration", checkpoint.getEndToEndDuration());
+		gen.writeNumberField("alignment_buffered", checkpoint.getAlignmentBuffered());
+
+	}
+
+	private static void writeHistory(JsonGenerator gen, CheckpointStatsHistory history) throws IOException {
+		gen.writeArrayFieldStart("history");
+		for (AbstractCheckpointStats checkpoint : history.getCheckpoints()) {
+			gen.writeStartObject();
+			gen.writeNumberField("id", checkpoint.getCheckpointId());
+			gen.writeStringField("status", checkpoint.getStatus().toString());
+			gen.writeBooleanField("is_savepoint", checkpoint.getProperties().isSavepoint());
+			gen.writeNumberField("trigger_timestamp", checkpoint.getTriggerTimestamp());
+			gen.writeNumberField("latest_ack_timestamp", checkpoint.getLatestAckTimestamp());
+			gen.writeNumberField("state_size", checkpoint.getStateSize());
+			gen.writeNumberField("end_to_end_duration", checkpoint.getEndToEndDuration());
+			gen.writeNumberField("alignment_buffered", checkpoint.getAlignmentBuffered());
+			gen.writeNumberField("num_subtasks", checkpoint.getNumberOfSubtasks());
+			gen.writeNumberField("num_acknowledged_subtasks", checkpoint.getNumberOfAcknowledgedSubtasks());
+
+			if (checkpoint.getStatus().isCompleted()) {
+				// --- Completed ---
+				CompletedCheckpointStats completed = (CompletedCheckpointStats) checkpoint;
+
+				String externalPath = completed.getExternalPath();
+				if (externalPath != null) {
+					gen.writeStringField("external_path", externalPath);
+				}
+
+				gen.writeBooleanField("discarded", completed.isDiscarded());
+			}
+			else if (checkpoint.getStatus().isFailed()) {
+				// --- Failed ---
+				FailedCheckpointStats failed = (FailedCheckpointStats) checkpoint;
+
+				gen.writeNumberField("failure_timestamp", failed.getFailureTimestamp());
+
+				String failureMsg = failed.getFailureMessage();
+				if (failureMsg != null) {
+					gen.writeStringField("failure_message", failureMsg);
+				}
+			}
+
+			gen.writeEndObject();
+		}
+		gen.writeEndArray();
+	}
+}


Mime
View raw message