flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [13/16] flink git commit: [FLINK-7531] Move Flink legacy rest handler to flink-runtime
Date Tue, 19 Sep 2017 22:44:23 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
deleted file mode 100644
index 9f83ed0..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
-import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
-import org.apache.flink.util.StringUtils;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * A request handler that provides an overview over all taskmanagers or details for a single one.
- */
-public class TaskManagersHandler extends AbstractJsonRequestHandler  {
-
-	private static final String TASKMANAGERS_REST_PATH = "/taskmanagers";
-	private static final String TASKMANAGER_DETAILS_REST_PATH = "/taskmanagers/:taskmanagerid";
-
-	public static final String TASK_MANAGER_ID_KEY = "taskmanagerid";
-
-	private final Time timeout;
-
-	private final MetricFetcher fetcher;
-
-	public TaskManagersHandler(Executor executor, Time timeout, MetricFetcher fetcher) {
-		super(executor);
-		this.timeout = requireNonNull(timeout);
-		this.fetcher = fetcher;
-	}
-
-	@Override
-	public String[] getPaths() {
-		return new String[]{TASKMANAGERS_REST_PATH, TASKMANAGER_DETAILS_REST_PATH};
-	}
-
-	@Override
-	public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
-		if (jobManagerGateway != null) {
-			// whether one task manager's metrics are requested, or all task manager, we
-			// return them in an array. This avoids unnecessary code complexity.
-			// If only one task manager is requested, we only fetch one task manager metrics.
-			if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) {
-				InstanceID instanceID = new InstanceID(StringUtils.hexStringToByte(pathParams.get(TASK_MANAGER_ID_KEY)));
-				CompletableFuture<Optional<Instance>> tmInstanceFuture = jobManagerGateway.requestTaskManagerInstance(instanceID, timeout);
-
-				return tmInstanceFuture.thenApplyAsync(
-					(Optional<Instance> optTaskManager) -> {
-						try {
-							return writeTaskManagersJson(
-								optTaskManager.map(Collections::singleton).orElse(Collections.emptySet()),
-								pathParams);
-						} catch (IOException e) {
-							throw new FlinkFutureException("Could not write TaskManagers JSON.", e);
-						}
-					},
-					executor);
-			} else {
-				CompletableFuture<Collection<Instance>> tmInstancesFuture = jobManagerGateway.requestTaskManagerInstances(timeout);
-
-				return tmInstancesFuture.thenApplyAsync(
-					(Collection<Instance> taskManagers) -> {
-						try {
-							return writeTaskManagersJson(taskManagers, pathParams);
-						} catch (IOException e) {
-							throw new FlinkFutureException("Could not write TaskManagers JSON.", e);
-						}
-					},
-					executor);
-			}
-		}
-		else {
-			return FutureUtils.completedExceptionally(new Exception("No connection to the leading JobManager."));
-		}
-	}
-
-	private String writeTaskManagersJson(Collection<Instance> instances, Map<String, String> pathParams) throws IOException {
-		StringWriter writer = new StringWriter();
-		JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-
-		gen.writeStartObject();
-		gen.writeArrayFieldStart("taskmanagers");
-
-		for (Instance instance : instances) {
-			gen.writeStartObject();
-			gen.writeStringField("id", instance.getId().toString());
-			gen.writeStringField("path", instance.getTaskManagerGateway().getAddress());
-			gen.writeNumberField("dataPort", instance.getTaskManagerLocation().dataPort());
-			gen.writeNumberField("timeSinceLastHeartbeat", instance.getLastHeartBeat());
-			gen.writeNumberField("slotsNumber", instance.getTotalNumberOfSlots());
-			gen.writeNumberField("freeSlots", instance.getNumberOfAvailableSlots());
-			gen.writeNumberField("cpuCores", instance.getResources().getNumberOfCPUCores());
-			gen.writeNumberField("physicalMemory", instance.getResources().getSizeOfPhysicalMemory());
-			gen.writeNumberField("freeMemory", instance.getResources().getSizeOfJvmHeap());
-			gen.writeNumberField("managedMemory", instance.getResources().getSizeOfManagedMemory());
-
-			// only send metrics when only one task manager requests them.
-			if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) {
-				fetcher.update();
-				MetricStore.TaskManagerMetricStore metrics = fetcher.getMetricStore().getTaskManagerMetricStore(instance.getId().toString());
-				if (metrics != null) {
-					gen.writeObjectFieldStart("metrics");
-					long heapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Used", "0"));
-					long heapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Committed", "0"));
-					long heapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Max", "0"));
-
-					gen.writeNumberField("heapCommitted", heapCommitted);
-					gen.writeNumberField("heapUsed", heapUsed);
-					gen.writeNumberField("heapMax", heapTotal);
-
-					long nonHeapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Used", "0"));
-					long nonHeapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Committed", "0"));
-					long nonHeapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Max", "0"));
-
-					gen.writeNumberField("nonHeapCommitted", nonHeapCommitted);
-					gen.writeNumberField("nonHeapUsed", nonHeapUsed);
-					gen.writeNumberField("nonHeapMax", nonHeapTotal);
-
-					gen.writeNumberField("totalCommitted", heapCommitted + nonHeapCommitted);
-					gen.writeNumberField("totalUsed", heapUsed + nonHeapUsed);
-					gen.writeNumberField("totalMax", heapTotal + nonHeapTotal);
-
-					long directCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.Count", "0"));
-					long directUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.MemoryUsed", "0"));
-					long directMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.TotalCapacity", "0"));
-
-					gen.writeNumberField("directCount", directCount);
-					gen.writeNumberField("directUsed", directUsed);
-					gen.writeNumberField("directMax", directMax);
-
-					long mappedCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.Count", "0"));
-					long mappedUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.MemoryUsed", "0"));
-					long mappedMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.TotalCapacity", "0"));
-
-					gen.writeNumberField("mappedCount", mappedCount);
-					gen.writeNumberField("mappedUsed", mappedUsed);
-					gen.writeNumberField("mappedMax", mappedMax);
-
-					long memorySegmentsAvailable = Long.valueOf(metrics.getMetric("Status.Network.AvailableMemorySegments", "0"));
-					long memorySegmentsTotal = Long.valueOf(metrics.getMetric("Status.Network.TotalMemorySegments", "0"));
-
-					gen.writeNumberField("memorySegmentsAvailable", memorySegmentsAvailable);
-					gen.writeNumberField("memorySegmentsTotal", memorySegmentsTotal);
-
-					gen.writeArrayFieldStart("garbageCollectors");
-
-					for (String gcName : metrics.garbageCollectorNames) {
-						String count = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Count", null);
-						String time = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Time", null);
-						if (count != null  && time != null) {
-							gen.writeStartObject();
-							gen.writeStringField("name", gcName);
-							gen.writeNumberField("count", Long.valueOf(count));
-							gen.writeNumberField("time", Long.valueOf(time));
-							gen.writeEndObject();
-						}
-					}
-
-					gen.writeEndArray();
-					gen.writeEndObject();
-				}
-			}
-
-			gen.writeEndObject();
-		}
-
-		gen.writeEndArray();
-		gen.writeEndObject();
-
-		gen.close();
-		return writer.toString();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
deleted file mode 100644
index 3affd7c..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
-
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
-import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler;
-import org.apache.flink.runtime.webmonitor.handlers.JsonFactory;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-/**
- * Handler that returns a job's snapshotting settings.
- */
-public class CheckpointConfigHandler extends AbstractExecutionGraphRequestHandler {
-
-	private static final String CHECKPOINT_CONFIG_REST_PATH = "/jobs/:jobid/checkpoints/config";
-
-	public CheckpointConfigHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
-		super(executionGraphHolder, executor);
-	}
-
-	@Override
-	public String[] getPaths() {
-		return new String[]{CHECKPOINT_CONFIG_REST_PATH};
-	}
-
-	@Override
-	public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
-		return CompletableFuture.supplyAsync(
-			() -> {
-				try {
-					return createCheckpointConfigJson(graph);
-				} catch (IOException e) {
-					throw new FlinkFutureException("Could not create checkpoint config json.", e);
-				}
-			},
-			executor);
-	}
-
-	/**
-	 * Archivist for the CheckpointConfigHandler.
-	 */
-	public static class CheckpointConfigJsonArchivist implements JsonArchivist {
-
-		@Override
-		public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
-			String json = createCheckpointConfigJson(graph);
-			String path = CHECKPOINT_CONFIG_REST_PATH
-				.replace(":jobid", graph.getJobID().toString());
-			return Collections.singletonList(new ArchivedJson(path, json));
-		}
-	}
-
-	private static String createCheckpointConfigJson(AccessExecutionGraph graph) throws IOException {
-		StringWriter writer = new StringWriter();
-		JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-		JobCheckpointingSettings settings = graph.getJobCheckpointingSettings();
-
-		if (settings == null) {
-			return "{}";
-		}
-
-		gen.writeStartObject();
-		{
-			gen.writeStringField("mode", settings.isExactlyOnce() ? "exactly_once" : "at_least_once");
-			gen.writeNumberField("interval", settings.getCheckpointInterval());
-			gen.writeNumberField("timeout", settings.getCheckpointTimeout());
-			gen.writeNumberField("min_pause", settings.getMinPauseBetweenCheckpoints());
-			gen.writeNumberField("max_concurrent", settings.getMaxConcurrentCheckpoints());
-
-			ExternalizedCheckpointSettings externalization = settings.getExternalizedCheckpointSettings();
-			gen.writeObjectFieldStart("externalization");
-			{
-				if (externalization.externalizeCheckpoints()) {
-					gen.writeBooleanField("enabled", true);
-					gen.writeBooleanField("delete_on_cancellation", externalization.deleteOnCancellation());
-				} else {
-					gen.writeBooleanField("enabled", false);
-				}
-			}
-			gen.writeEndObject();
-
-		}
-		gen.writeEndObject();
-
-		gen.close();
-
-		return writer.toString();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsCache.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsCache.java
deleted file mode 100644
index 974364d..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsCache.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
-
-import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
-
-import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
-import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
-
-import javax.annotation.Nullable;
-
-/**
- * A size-based cache of accessed checkpoints for completed and failed
- * checkpoints.
- *
- * <p>Having this cache in place for accessed stats improves the user
- * experience quite a bit as accessed checkpoint stats stay available
- * and don't expire. For example if you manage to click on the last
- * checkpoint in the history, it is not available via the stats as soon
- * as another checkpoint is triggered. With the cache in place, the
- * checkpoint will still be available for investigation.
- */
-public class CheckpointStatsCache {
-
-	@Nullable
-	private final Cache<Long, AbstractCheckpointStats> cache;
-
-	public CheckpointStatsCache(int maxNumEntries) {
-		if (maxNumEntries > 0) {
-			this.cache = CacheBuilder.<Long, AbstractCheckpointStats>newBuilder()
-				.maximumSize(maxNumEntries)
-				.build();
-		} else {
-			this.cache = null;
-		}
-	}
-
-	/**
-	 * Try to add the checkpoint to the cache.
-	 *
-	 * @param checkpoint Checkpoint to be added.
-	 */
-	void tryAdd(AbstractCheckpointStats checkpoint) {
-		// Don't add in progress checkpoints as they will be replaced by their
-		// completed/failed version eventually.
-		if (cache != null && checkpoint != null && !checkpoint.getStatus().isInProgress()) {
-			cache.put(checkpoint.getCheckpointId(), checkpoint);
-		}
-	}
-
-	/**
-	 * Try to look up a checkpoint by it's ID in the cache.
-	 *
-	 * @param checkpointId ID of the checkpoint to look up.
-	 * @return The checkpoint or <code>null</code> if checkpoint not found.
-	 */
-	AbstractCheckpointStats tryGet(long checkpointId) {
-		if (cache != null) {
-			return cache.getIfPresent(checkpointId);
-		} else {
-			return null;
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
deleted file mode 100644
index 96cc3e0..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
-
-import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
-import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
-import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
-import org.apache.flink.runtime.checkpoint.TaskStateStats;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler;
-import org.apache.flink.runtime.webmonitor.handlers.JsonFactory;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-/**
- * Request handler that returns checkpoint stats for a single job vertex.
- */
-public class CheckpointStatsDetailsHandler extends AbstractExecutionGraphRequestHandler {
-
-	private static final String CHECKPOINT_STATS_DETAILS_REST_PATH = "/jobs/:jobid/checkpoints/details/:checkpointid";
-
-	private final CheckpointStatsCache cache;
-
-	public CheckpointStatsDetailsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, CheckpointStatsCache cache) {
-		super(executionGraphHolder, executor);
-		this.cache = cache;
-	}
-
-	@Override
-	public String[] getPaths() {
-		return new String[]{CHECKPOINT_STATS_DETAILS_REST_PATH};
-	}
-
-	@Override
-	public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
-		return CompletableFuture.supplyAsync(
-			() -> {
-				long checkpointId = parseCheckpointId(params);
-				if (checkpointId == -1) {
-					return "{}";
-				}
-
-				CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot();
-				if (snapshot == null) {
-					return "{}";
-				}
-
-				AbstractCheckpointStats checkpoint = snapshot.getHistory().getCheckpointById(checkpointId);
-
-				if (checkpoint != null) {
-					cache.tryAdd(checkpoint);
-				} else {
-					checkpoint = cache.tryGet(checkpointId);
-
-					if (checkpoint == null) {
-						return "{}";
-					}
-				}
-
-				try {
-					return createCheckpointDetailsJson(checkpoint);
-				} catch (IOException e) {
-					throw new FlinkFutureException("Could not create checkpoint details json.", e);
-				}
-			},
-			executor);
-	}
-
-	/**
-	 * Archivist for the CheckpointStatsDetails.
-	 */
-	public static class CheckpointStatsDetailsJsonArchivist implements JsonArchivist {
-
-		@Override
-		public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
-			CheckpointStatsSnapshot stats = graph.getCheckpointStatsSnapshot();
-			if (stats == null) {
-				return Collections.emptyList();
-			}
-			CheckpointStatsHistory history = stats.getHistory();
-			List<ArchivedJson> archive = new ArrayList<>();
-			for (AbstractCheckpointStats checkpoint : history.getCheckpoints()) {
-				String json = createCheckpointDetailsJson(checkpoint);
-				String path = CHECKPOINT_STATS_DETAILS_REST_PATH
-					.replace(":jobid", graph.getJobID().toString())
-					.replace(":checkpointid", String.valueOf(checkpoint.getCheckpointId()));
-				archive.add(new ArchivedJson(path, json));
-			}
-			return archive;
-		}
-	}
-
-	public static String createCheckpointDetailsJson(AbstractCheckpointStats checkpoint) throws IOException {
-		StringWriter writer = new StringWriter();
-		JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-		gen.writeStartObject();
-
-		gen.writeNumberField("id", checkpoint.getCheckpointId());
-		gen.writeStringField("status", checkpoint.getStatus().toString());
-		gen.writeBooleanField("is_savepoint", checkpoint.getProperties().isSavepoint());
-		gen.writeNumberField("trigger_timestamp", checkpoint.getTriggerTimestamp());
-		gen.writeNumberField("latest_ack_timestamp", checkpoint.getLatestAckTimestamp());
-		gen.writeNumberField("state_size", checkpoint.getStateSize());
-		gen.writeNumberField("end_to_end_duration", checkpoint.getEndToEndDuration());
-		gen.writeNumberField("alignment_buffered", checkpoint.getAlignmentBuffered());
-		gen.writeNumberField("num_subtasks", checkpoint.getNumberOfSubtasks());
-		gen.writeNumberField("num_acknowledged_subtasks", checkpoint.getNumberOfAcknowledgedSubtasks());
-
-		if (checkpoint.getStatus().isCompleted()) {
-			// --- Completed ---
-			CompletedCheckpointStats completed = (CompletedCheckpointStats) checkpoint;
-
-			String externalPath = completed.getExternalPath();
-			if (externalPath != null) {
-				gen.writeStringField("external_path", externalPath);
-			}
-
-			gen.writeBooleanField("discarded", completed.isDiscarded());
-		}
-		else if (checkpoint.getStatus().isFailed()) {
-			// --- Failed ---
-			FailedCheckpointStats failed = (FailedCheckpointStats) checkpoint;
-
-			gen.writeNumberField("failure_timestamp", failed.getFailureTimestamp());
-
-			String failureMsg = failed.getFailureMessage();
-			if (failureMsg != null) {
-				gen.writeStringField("failure_message", failureMsg);
-			}
-		}
-
-		gen.writeObjectFieldStart("tasks");
-		for (TaskStateStats taskStats : checkpoint.getAllTaskStateStats()) {
-			gen.writeObjectFieldStart(taskStats.getJobVertexId().toString());
-
-			gen.writeNumberField("latest_ack_timestamp", taskStats.getLatestAckTimestamp());
-			gen.writeNumberField("state_size", taskStats.getStateSize());
-			gen.writeNumberField("end_to_end_duration", taskStats.getEndToEndDuration(checkpoint.getTriggerTimestamp()));
-			gen.writeNumberField("alignment_buffered", taskStats.getAlignmentBuffered());
-			gen.writeNumberField("num_subtasks", taskStats.getNumberOfSubtasks());
-			gen.writeNumberField("num_acknowledged_subtasks", taskStats.getNumberOfAcknowledgedSubtasks());
-
-			gen.writeEndObject();
-		}
-		gen.writeEndObject();
-
-		gen.writeEndObject();
-		gen.close();
-
-		return writer.toString();
-	}
-
-	/**
-	 * Returns the checkpoint ID parsed from the provided parameters.
-	 *
-	 * @param params Path parameters
-	 * @return Parsed checkpoint ID or <code>-1</code> if not available.
-	 */
-	static long parseCheckpointId(Map<String, String> params) {
-		String param = params.get("checkpointid");
-		if (param == null) {
-			return -1;
-		}
-
-		try {
-			return Long.parseLong(param);
-		} catch (NumberFormatException ignored) {
-			return -1;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
deleted file mode 100644
index 045248b..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
-
-import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
-import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
-import org.apache.flink.runtime.checkpoint.SubtaskStateStats;
-import org.apache.flink.runtime.checkpoint.TaskStateStats;
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler;
-import org.apache.flink.runtime.webmonitor.handlers.AbstractJobVertexRequestHandler;
-import org.apache.flink.runtime.webmonitor.handlers.JsonFactory;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-import static org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointStatsHandler.writeMinMaxAvg;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Request handler that returns checkpoint stats for a single job vertex with
- * the summary stats and all subtasks.
- */
-public class CheckpointStatsDetailsSubtasksHandler extends AbstractExecutionGraphRequestHandler {
-
-	private static final String CHECKPOINT_STATS_DETAILS_SUBTASKS_REST_PATH = "/jobs/:jobid/checkpoints/details/:checkpointid/subtasks/:vertexid";
-
-	private final CheckpointStatsCache cache;
-
-	public CheckpointStatsDetailsSubtasksHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, CheckpointStatsCache cache) {
-		super(executionGraphHolder, executor);
-		this.cache = checkNotNull(cache);
-	}
-
-	@Override
-	public String[] getPaths() {
-		return new String[]{CHECKPOINT_STATS_DETAILS_SUBTASKS_REST_PATH};
-	}
-
-	@Override
-	public CompletableFuture<String> handleJsonRequest(
-			Map<String, String> pathParams,
-			Map<String, String> queryParams,
-			JobManagerGateway jobManagerGateway) {
-		return super.handleJsonRequest(pathParams, queryParams, jobManagerGateway);
-	}
-
-	@Override
-	public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
-		long checkpointId = CheckpointStatsDetailsHandler.parseCheckpointId(params);
-		if (checkpointId == -1) {
-			return CompletableFuture.completedFuture("{}");
-		}
-
-		JobVertexID vertexId = AbstractJobVertexRequestHandler.parseJobVertexId(params);
-		if (vertexId == null) {
-			return CompletableFuture.completedFuture("{}");
-		}
-
-		CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot();
-		if (snapshot == null) {
-			return CompletableFuture.completedFuture("{}");
-		}
-
-		AbstractCheckpointStats checkpoint = snapshot.getHistory().getCheckpointById(checkpointId);
-
-		if (checkpoint != null) {
-			cache.tryAdd(checkpoint);
-		} else {
-			checkpoint = cache.tryGet(checkpointId);
-
-			if (checkpoint == null) {
-				return CompletableFuture.completedFuture("{}");
-			}
-		}
-
-		TaskStateStats taskStats = checkpoint.getTaskStateStats(vertexId);
-		if (taskStats == null) {
-			return CompletableFuture.completedFuture("{}");
-		}
-
-		try {
-			return CompletableFuture.completedFuture(createSubtaskCheckpointDetailsJson(checkpoint, taskStats));
-		} catch (IOException e) {
-			return FutureUtils.completedExceptionally(e);
-		}
-	}
-
-	/**
-	 * Archivist for the CheckpointStatsDetailsSubtasksHandler.
-	 */
-	public static class CheckpointStatsDetailsSubtasksJsonArchivist implements JsonArchivist {
-
-		@Override
-		public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
-			CheckpointStatsSnapshot stats = graph.getCheckpointStatsSnapshot();
-			if (stats == null) {
-				return Collections.emptyList();
-			}
-			CheckpointStatsHistory history = stats.getHistory();
-			List<ArchivedJson> archive = new ArrayList<>();
-			for (AbstractCheckpointStats checkpoint : history.getCheckpoints()) {
-				for (TaskStateStats subtaskStats : checkpoint.getAllTaskStateStats()) {
-					String json = createSubtaskCheckpointDetailsJson(checkpoint, subtaskStats);
-					String path = CHECKPOINT_STATS_DETAILS_SUBTASKS_REST_PATH
-						.replace(":jobid", graph.getJobID().toString())
-						.replace(":checkpointid", String.valueOf(checkpoint.getCheckpointId()))
-						.replace(":vertexid", subtaskStats.getJobVertexId().toString());
-					archive.add(new ArchivedJson(path, json));
-				}
-			}
-			return archive;
-		}
-	}
-
-	private static String createSubtaskCheckpointDetailsJson(AbstractCheckpointStats checkpoint, TaskStateStats taskStats) throws IOException {
-		StringWriter writer = new StringWriter();
-		JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-
-		gen.writeStartObject();
-		// Overview
-		gen.writeNumberField("id", checkpoint.getCheckpointId());
-		gen.writeStringField("status", checkpoint.getStatus().toString());
-		gen.writeNumberField("latest_ack_timestamp", taskStats.getLatestAckTimestamp());
-		gen.writeNumberField("state_size", taskStats.getStateSize());
-		gen.writeNumberField("end_to_end_duration", taskStats.getEndToEndDuration(checkpoint.getTriggerTimestamp()));
-		gen.writeNumberField("alignment_buffered", taskStats.getAlignmentBuffered());
-		gen.writeNumberField("num_subtasks", taskStats.getNumberOfSubtasks());
-		gen.writeNumberField("num_acknowledged_subtasks", taskStats.getNumberOfAcknowledgedSubtasks());
-
-		if (taskStats.getNumberOfAcknowledgedSubtasks() > 0) {
-			gen.writeObjectFieldStart("summary");
-			gen.writeObjectFieldStart("state_size");
-			writeMinMaxAvg(gen, taskStats.getSummaryStats().getStateSizeStats());
-			gen.writeEndObject();
-
-			gen.writeObjectFieldStart("end_to_end_duration");
-			MinMaxAvgStats ackTimestampStats = taskStats.getSummaryStats().getAckTimestampStats();
-			gen.writeNumberField("min", Math.max(0, ackTimestampStats.getMinimum() - checkpoint.getTriggerTimestamp()));
-			gen.writeNumberField("max", Math.max(0, ackTimestampStats.getMaximum() - checkpoint.getTriggerTimestamp()));
-			gen.writeNumberField("avg", Math.max(0, ackTimestampStats.getAverage() - checkpoint.getTriggerTimestamp()));
-			gen.writeEndObject();
-
-			gen.writeObjectFieldStart("checkpoint_duration");
-			gen.writeObjectFieldStart("sync");
-			writeMinMaxAvg(gen, taskStats.getSummaryStats().getSyncCheckpointDurationStats());
-			gen.writeEndObject();
-			gen.writeObjectFieldStart("async");
-			writeMinMaxAvg(gen, taskStats.getSummaryStats().getAsyncCheckpointDurationStats());
-			gen.writeEndObject();
-			gen.writeEndObject();
-
-			gen.writeObjectFieldStart("alignment");
-			gen.writeObjectFieldStart("buffered");
-			writeMinMaxAvg(gen, taskStats.getSummaryStats().getAlignmentBufferedStats());
-			gen.writeEndObject();
-			gen.writeObjectFieldStart("duration");
-			writeMinMaxAvg(gen, taskStats.getSummaryStats().getAlignmentDurationStats());
-			gen.writeEndObject();
-			gen.writeEndObject();
-			gen.writeEndObject();
-		}
-
-		SubtaskStateStats[] subtasks = taskStats.getSubtaskStats();
-
-		gen.writeArrayFieldStart("subtasks");
-		for (int i = 0; i < subtasks.length; i++) {
-			SubtaskStateStats subtask = subtasks[i];
-
-			gen.writeStartObject();
-			gen.writeNumberField("index", i);
-
-			if (subtask != null) {
-				gen.writeStringField("status", "completed");
-				gen.writeNumberField("ack_timestamp", subtask.getAckTimestamp());
-				gen.writeNumberField("end_to_end_duration", subtask.getEndToEndDuration(checkpoint.getTriggerTimestamp()));
-				gen.writeNumberField("state_size", subtask.getStateSize());
-
-				gen.writeObjectFieldStart("checkpoint");
-				gen.writeNumberField("sync", subtask.getSyncCheckpointDuration());
-				gen.writeNumberField("async", subtask.getAsyncCheckpointDuration());
-				gen.writeEndObject();
-
-				gen.writeObjectFieldStart("alignment");
-				gen.writeNumberField("buffered", subtask.getAlignmentBuffered());
-				gen.writeNumberField("duration", subtask.getAlignmentDuration());
-				gen.writeEndObject();
-			} else {
-				gen.writeStringField("status", "pending_or_failed");
-			}
-			gen.writeEndObject();
-		}
-		gen.writeEndArray();
-
-		gen.writeEndObject();
-		gen.close();
-
-		return writer.toString();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
deleted file mode 100644
index a60aee0..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
-
-import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsCounts;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
-import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
-import org.apache.flink.runtime.checkpoint.CompletedCheckpointStatsSummary;
-import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
-import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
-import org.apache.flink.runtime.checkpoint.RestoredCheckpointStats;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler;
-import org.apache.flink.runtime.webmonitor.handlers.JsonFactory;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-/**
- * Handler that returns checkpoint statistics for a job.
- */
-public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler {
-
-	private static final String CHECKPOINT_STATS_REST_PATH = "/jobs/:jobid/checkpoints";
-
-	public CheckpointStatsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
-		super(executionGraphHolder, executor);
-	}
-
-	@Override
-	public String[] getPaths() {
-		return new String[]{CHECKPOINT_STATS_REST_PATH};
-	}
-
-	@Override
-	public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
-		return CompletableFuture.supplyAsync(
-			() -> {
-				try {
-					return createCheckpointStatsJson(graph);
-				} catch (IOException e) {
-					throw new FlinkFutureException("Could not create checkpoint stats json.", e);
-				}
-			},
-			executor);
-	}
-
-	/**
-	 * Archivist for the CheckpointStatsJsonHandler.
-	 */
-	public static class CheckpointStatsJsonArchivist implements JsonArchivist {
-
-		@Override
-		public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
-			String json = createCheckpointStatsJson(graph);
-			String path = CHECKPOINT_STATS_REST_PATH
-				.replace(":jobid", graph.getJobID().toString());
-			return Collections.singletonList(new ArchivedJson(path, json));
-		}
-	}
-
-	private static String createCheckpointStatsJson(AccessExecutionGraph graph) throws IOException {
-		StringWriter writer = new StringWriter();
-		JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-
-		CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot();
-		if (snapshot == null) {
-			return "{}";
-		}
-
-		gen.writeStartObject();
-
-		// Counts
-		writeCounts(gen, snapshot.getCounts());
-
-		// Summary
-		writeSummary(gen, snapshot.getSummaryStats());
-
-		CheckpointStatsHistory history = snapshot.getHistory();
-
-		// Latest
-		writeLatestCheckpoints(
-			gen,
-			history.getLatestCompletedCheckpoint(),
-			history.getLatestSavepoint(),
-			history.getLatestFailedCheckpoint(),
-			snapshot.getLatestRestoredCheckpoint());
-
-		// History
-		writeHistory(gen, snapshot.getHistory());
-
-		gen.writeEndObject();
-		gen.close();
-
-		return writer.toString();
-	}
-
-	private static void writeCounts(JsonGenerator gen, CheckpointStatsCounts counts) throws IOException {
-		gen.writeObjectFieldStart("counts");
-		gen.writeNumberField("restored", counts.getNumberOfRestoredCheckpoints());
-		gen.writeNumberField("total", counts.getTotalNumberOfCheckpoints());
-		gen.writeNumberField("in_progress", counts.getNumberOfInProgressCheckpoints());
-		gen.writeNumberField("completed", counts.getNumberOfCompletedCheckpoints());
-		gen.writeNumberField("failed", counts.getNumberOfFailedCheckpoints());
-		gen.writeEndObject();
-	}
-
-	private static void writeSummary(
-		JsonGenerator gen,
-		CompletedCheckpointStatsSummary summary) throws IOException {
-		gen.writeObjectFieldStart("summary");
-		gen.writeObjectFieldStart("state_size");
-		writeMinMaxAvg(gen, summary.getStateSizeStats());
-		gen.writeEndObject();
-
-		gen.writeObjectFieldStart("end_to_end_duration");
-		writeMinMaxAvg(gen, summary.getEndToEndDurationStats());
-		gen.writeEndObject();
-
-		gen.writeObjectFieldStart("alignment_buffered");
-		writeMinMaxAvg(gen, summary.getAlignmentBufferedStats());
-		gen.writeEndObject();
-		gen.writeEndObject();
-	}
-
-	static void writeMinMaxAvg(JsonGenerator gen, MinMaxAvgStats minMaxAvg) throws IOException {
-		gen.writeNumberField("min", minMaxAvg.getMinimum());
-		gen.writeNumberField("max", minMaxAvg.getMaximum());
-		gen.writeNumberField("avg", minMaxAvg.getAverage());
-	}
-
-	private static void writeLatestCheckpoints(
-		JsonGenerator gen,
-		@Nullable CompletedCheckpointStats completed,
-		@Nullable CompletedCheckpointStats savepoint,
-		@Nullable FailedCheckpointStats failed,
-		@Nullable RestoredCheckpointStats restored) throws IOException {
-
-		gen.writeObjectFieldStart("latest");
-		// Completed checkpoint
-		if (completed != null) {
-			gen.writeObjectFieldStart("completed");
-			writeCheckpoint(gen, completed);
-
-			String externalPath = completed.getExternalPath();
-			if (externalPath != null) {
-				gen.writeStringField("external_path", completed.getExternalPath());
-			}
-
-			gen.writeEndObject();
-		}
-
-		// Completed savepoint
-		if (savepoint != null) {
-			gen.writeObjectFieldStart("savepoint");
-			writeCheckpoint(gen, savepoint);
-
-			String externalPath = savepoint.getExternalPath();
-			if (externalPath != null) {
-				gen.writeStringField("external_path", savepoint.getExternalPath());
-			}
-			gen.writeEndObject();
-		}
-
-		// Failed checkpoint
-		if (failed != null) {
-			gen.writeObjectFieldStart("failed");
-			writeCheckpoint(gen, failed);
-
-			gen.writeNumberField("failure_timestamp", failed.getFailureTimestamp());
-			String failureMsg = failed.getFailureMessage();
-			if (failureMsg != null) {
-				gen.writeStringField("failure_message", failureMsg);
-			}
-			gen.writeEndObject();
-		}
-
-		// Restored checkpoint
-		if (restored != null) {
-			gen.writeObjectFieldStart("restored");
-			gen.writeNumberField("id", restored.getCheckpointId());
-			gen.writeNumberField("restore_timestamp", restored.getRestoreTimestamp());
-			gen.writeBooleanField("is_savepoint", restored.getProperties().isSavepoint());
-
-			String externalPath = restored.getExternalPath();
-			if (externalPath != null) {
-				gen.writeStringField("external_path", externalPath);
-			}
-			gen.writeEndObject();
-		}
-		gen.writeEndObject();
-	}
-
-	private static void writeCheckpoint(JsonGenerator gen, AbstractCheckpointStats checkpoint) throws IOException {
-		gen.writeNumberField("id", checkpoint.getCheckpointId());
-		gen.writeNumberField("trigger_timestamp", checkpoint.getTriggerTimestamp());
-		gen.writeNumberField("latest_ack_timestamp", checkpoint.getLatestAckTimestamp());
-		gen.writeNumberField("state_size", checkpoint.getStateSize());
-		gen.writeNumberField("end_to_end_duration", checkpoint.getEndToEndDuration());
-		gen.writeNumberField("alignment_buffered", checkpoint.getAlignmentBuffered());
-
-	}
-
-	private static void writeHistory(JsonGenerator gen, CheckpointStatsHistory history) throws IOException {
-		gen.writeArrayFieldStart("history");
-		for (AbstractCheckpointStats checkpoint : history.getCheckpoints()) {
-			gen.writeStartObject();
-			gen.writeNumberField("id", checkpoint.getCheckpointId());
-			gen.writeStringField("status", checkpoint.getStatus().toString());
-			gen.writeBooleanField("is_savepoint", checkpoint.getProperties().isSavepoint());
-			gen.writeNumberField("trigger_timestamp", checkpoint.getTriggerTimestamp());
-			gen.writeNumberField("latest_ack_timestamp", checkpoint.getLatestAckTimestamp());
-			gen.writeNumberField("state_size", checkpoint.getStateSize());
-			gen.writeNumberField("end_to_end_duration", checkpoint.getEndToEndDuration());
-			gen.writeNumberField("alignment_buffered", checkpoint.getAlignmentBuffered());
-			gen.writeNumberField("num_subtasks", checkpoint.getNumberOfSubtasks());
-			gen.writeNumberField("num_acknowledged_subtasks", checkpoint.getNumberOfAcknowledgedSubtasks());
-
-			if (checkpoint.getStatus().isCompleted()) {
-				// --- Completed ---
-				CompletedCheckpointStats completed = (CompletedCheckpointStats) checkpoint;
-
-				String externalPath = completed.getExternalPath();
-				if (externalPath != null) {
-					gen.writeStringField("external_path", externalPath);
-				}
-
-				gen.writeBooleanField("discarded", completed.isDiscarded());
-			}
-			else if (checkpoint.getStatus().isFailed()) {
-				// --- Failed ---
-				FailedCheckpointStats failed = (FailedCheckpointStats) checkpoint;
-
-				gen.writeNumberField("failure_timestamp", failed.getFailureTimestamp());
-
-				String failureMsg = failed.getFailureMessage();
-				if (failureMsg != null) {
-					gen.writeStringField("failure_message", failureMsg);
-				}
-			}
-
-			gen.writeEndObject();
-		}
-		gen.writeEndArray();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
index 6da8115..01228d5 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
@@ -27,9 +27,9 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.history.FsJobArchivist;
 import org.apache.flink.runtime.net.SSLUtils;
+import org.apache.flink.runtime.rest.handler.legacy.DashboardConfigHandler;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
-import org.apache.flink.runtime.webmonitor.handlers.DashboardConfigHandler;
 import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.FlinkException;

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
index 0fc4314..bae8e21 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
@@ -24,8 +24,8 @@ import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.history.FsJobArchivist;
+import org.apache.flink.runtime.rest.handler.legacy.CurrentJobsOverviewHandler;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
-import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
 import org.apache.flink.util.FileUtils;
 
 import com.fasterxml.jackson.core.JsonFactory;

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
index c5943dc..12a27a7 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
@@ -26,7 +26,7 @@ package org.apache.flink.runtime.webmonitor.history;
  * https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java
  *****************************************************************************/
 
-import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
+import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
deleted file mode 100644
index cf286ce..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.metrics;
-
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.webmonitor.handlers.AbstractJsonRequestHandler;
-import org.apache.flink.runtime.webmonitor.handlers.JsonFactory;
-import org.apache.flink.util.Preconditions;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-/**
- * Abstract request handler that returns a list of all available metrics or the values for a set of metrics.
- *
- * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned.
- * {@code [ { "id" : "X" } ] }
- *
- * <p>If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value.
- * {@code /get?X,Y}
- * The handler will then return a list containing the values of the requested metrics.
- * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
- */
-public abstract class AbstractMetricsHandler extends AbstractJsonRequestHandler {
-	private final MetricFetcher fetcher;
-
-	public AbstractMetricsHandler(Executor executor, MetricFetcher fetcher) {
-		super(executor);
-		this.fetcher = Preconditions.checkNotNull(fetcher);
-	}
-
-	@Override
-	public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
-		return CompletableFuture.supplyAsync(
-			() -> {
-				fetcher.update();
-				String requestedMetricsList = queryParams.get("get");
-				try {
-					return requestedMetricsList != null
-						? getMetricsValues(pathParams, requestedMetricsList)
-						: getAvailableMetricsList(pathParams);
-				} catch (IOException e) {
-					throw new FlinkFutureException("Could not retrieve metrics.", e);
-				}
-			},
-			executor);
-
-	}
-
-	/**
-	 * Returns a Map containing the metrics belonging to the entity pointed to by the path parameters.
-	 *
-	 * @param pathParams REST path parameters
-	 * @param metrics MetricStore containing all metrics
-	 * @return Map containing metrics, or null if no metric exists
-	 */
-	protected abstract Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore metrics);
-
-	private String getMetricsValues(Map<String, String> pathParams, String requestedMetricsList) throws IOException {
-		if (requestedMetricsList.isEmpty()) {
-			/*
-			 * The WebInterface doesn't check whether the list of available metrics was empty. This can lead to a
-			 * request for which the "get" parameter is an empty string.
-			 */
-			return "";
-		}
-		MetricStore metricStore = fetcher.getMetricStore();
-		synchronized (metricStore) {
-			Map<String, String> metrics = getMapFor(pathParams, metricStore);
-			if (metrics == null) {
-				return "";
-			}
-			String[] requestedMetrics = requestedMetricsList.split(",");
-
-			StringWriter writer = new StringWriter();
-			JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-
-			gen.writeStartArray();
-			for (String requestedMetric : requestedMetrics) {
-				Object metricValue = metrics.get(requestedMetric);
-				if (metricValue != null) {
-					gen.writeStartObject();
-					gen.writeStringField("id", requestedMetric);
-					gen.writeStringField("value", metricValue.toString());
-					gen.writeEndObject();
-				}
-			}
-			gen.writeEndArray();
-
-			gen.close();
-			return writer.toString();
-		}
-	}
-
-	private String getAvailableMetricsList(Map<String, String> pathParams) throws IOException {
-		MetricStore metricStore = fetcher.getMetricStore();
-		synchronized (metricStore) {
-			Map<String, String> metrics = getMapFor(pathParams, metricStore);
-			if (metrics == null) {
-				return "";
-			}
-			StringWriter writer = new StringWriter();
-			JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-
-			gen.writeStartArray();
-			for (String m : metrics.keySet()) {
-				gen.writeStartObject();
-				gen.writeStringField("id", m);
-				gen.writeEndObject();
-			}
-			gen.writeEndArray();
-
-			gen.close();
-			return writer.toString();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
deleted file mode 100644
index 2bd6683..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.metrics;
-
-import java.util.Map;
-import java.util.concurrent.Executor;
-
-/**
- * Request handler that returns for the job manager a list of all available metrics or the values for a set of metrics.
- *
- * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned.
- * {@code {"available": [ { "name" : "X", "id" : "X" } ] } }
- *
- * <p>If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value.
- * {@code /get?X,Y}
- * The handler will then return a list containing the values of the requested metrics.
- * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
- */
-public class JobManagerMetricsHandler extends AbstractMetricsHandler {
-
-	private static final String JOBMANAGER_METRICS_REST_PATH = "/jobmanager/metrics";
-
-	public JobManagerMetricsHandler(Executor executor, MetricFetcher fetcher) {
-		super(executor, fetcher);
-	}
-
-	@Override
-	public String[] getPaths() {
-		return new String[]{JOBMANAGER_METRICS_REST_PATH};
-	}
-
-	@Override
-	protected Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore metrics) {
-		MetricStore.JobManagerMetricStore jobManager = metrics.getJobManagerMetricStore();
-		if (jobManager == null) {
-			return null;
-		} else {
-			return jobManager.metrics;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
deleted file mode 100644
index e5e2500..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.metrics;
-
-import java.util.Map;
-import java.util.concurrent.Executor;
-
-/**
- * Request handler that returns for a given job a list of all available metrics or the values for a set of metrics.
- *
- * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned.
- * {@code {"available": [ { "name" : "X", "id" : "X" } ] } }
- *
- * <p>If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value.
- * {@code /get?X,Y}
- * The handler will then return a list containing the values of the requested metrics.
- * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
- */
-public class JobMetricsHandler extends AbstractMetricsHandler {
-	public static final String PARAMETER_JOB_ID = "jobid";
-	private static final String JOB_METRICS_REST_PATH = "/jobs/:jobid/metrics";
-
-	public JobMetricsHandler(Executor executor, MetricFetcher fetcher) {
-		super(executor, fetcher);
-	}
-
-	@Override
-	public String[] getPaths() {
-		return new String[]{JOB_METRICS_REST_PATH};
-	}
-
-	@Override
-	protected Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore metrics) {
-		MetricStore.JobMetricStore job = metrics.getJobMetricStore(pathParams.get(PARAMETER_JOB_ID));
-		return job != null
-			? job.metrics
-			: null;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
deleted file mode 100644
index 1d2cd84..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.metrics;
-
-import java.util.Map;
-import java.util.concurrent.Executor;
-
-/**
- * Request handler that returns for a given task a list of all available metrics or the values for a set of metrics.
- *
- * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned.
- * {@code {"available": [ { "name" : "X", "id" : "X" } ] } }
- *
- * <p>If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value.
- * {@code /get?X,Y}
- * The handler will then return a list containing the values of the requested metrics.
- * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
- */
-public class JobVertexMetricsHandler extends AbstractMetricsHandler {
-	public static final String PARAMETER_VERTEX_ID = "vertexid";
-	private static final String JOB_VERTEX_METRICS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/metrics";
-
-	public JobVertexMetricsHandler(Executor executor, MetricFetcher fetcher) {
-		super(executor, fetcher);
-	}
-
-	@Override
-	public String[] getPaths() {
-		return new String[]{JOB_VERTEX_METRICS_REST_PATH};
-	}
-
-	@Override
-	protected Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore metrics) {
-		MetricStore.TaskMetricStore task = metrics.getTaskMetricStore(
-			pathParams.get(JobMetricsHandler.PARAMETER_JOB_ID),
-			pathParams.get(PARAMETER_VERTEX_ID));
-		return task != null
-			? task.metrics
-			: null;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
deleted file mode 100644
index a5f4ca5..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.metrics;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.messages.webmonitor.JobDetails;
-import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
-import org.apache.flink.runtime.metrics.dump.MetricDump;
-import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization;
-import org.apache.flink.runtime.metrics.dump.MetricQueryService;
-import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway;
-import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
-import org.apache.flink.util.Preconditions;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import java.util.stream.Collectors;
-
-import static org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.MetricDumpDeserializer;
-
-/**
- * The MetricFetcher can be used to fetch metrics from the JobManager and all registered TaskManagers.
- *
- * <p>Metrics will only be fetched when {@link MetricFetcher#update()} is called, provided that a sufficient time since
- * the last call has passed.
- */
-public class MetricFetcher {
-	private static final Logger LOG = LoggerFactory.getLogger(MetricFetcher.class);
-
-	private final GatewayRetriever<JobManagerGateway> retriever;
-	private final MetricQueryServiceRetriever queryServiceRetriever;
-	private final Executor executor;
-	private final Time timeout;
-
-	private final MetricStore metrics = new MetricStore();
-	private final MetricDumpDeserializer deserializer = new MetricDumpDeserializer();
-
-	private long lastUpdateTime;
-
-	public MetricFetcher(
-			GatewayRetriever<JobManagerGateway> retriever,
-			MetricQueryServiceRetriever queryServiceRetriever,
-			Executor executor,
-			Time timeout) {
-		this.retriever = Preconditions.checkNotNull(retriever);
-		this.queryServiceRetriever = Preconditions.checkNotNull(queryServiceRetriever);
-		this.executor = Preconditions.checkNotNull(executor);
-		this.timeout = Preconditions.checkNotNull(timeout);
-	}
-
-	/**
-	 * Returns the MetricStore containing all stored metrics.
-	 *
-	 * @return MetricStore containing all stored metrics;
-	 */
-	public MetricStore getMetricStore() {
-		return metrics;
-	}
-
-	/**
-	 * This method can be used to signal this MetricFetcher that the metrics are still in use and should be updated.
-	 */
-	public void update() {
-		synchronized (this) {
-			long currentTime = System.currentTimeMillis();
-			if (currentTime - lastUpdateTime > 10000) { // 10 seconds have passed since the last update
-				lastUpdateTime = currentTime;
-				fetchMetrics();
-			}
-		}
-	}
-
-	private void fetchMetrics() {
-		try {
-			Optional<JobManagerGateway> optJobManagerGateway = retriever.getNow();
-			if (optJobManagerGateway.isPresent()) {
-				final JobManagerGateway jobManagerGateway = optJobManagerGateway.get();
-
-				/**
-				 * Remove all metrics that belong to a job that is not running and no longer archived.
-				 */
-				CompletableFuture<MultipleJobsDetails> jobDetailsFuture = jobManagerGateway.requestJobDetails(true, true, timeout);
-
-				jobDetailsFuture.whenCompleteAsync(
-					(MultipleJobsDetails jobDetails, Throwable throwable) -> {
-						if (throwable != null) {
-							LOG.debug("Fetching of JobDetails failed.", throwable);
-						} else {
-							ArrayList<String> toRetain = new ArrayList<>();
-							for (JobDetails job : jobDetails.getRunningJobs()) {
-								toRetain.add(job.getJobId().toString());
-							}
-							for (JobDetails job : jobDetails.getFinishedJobs()) {
-								toRetain.add(job.getJobId().toString());
-							}
-							synchronized (metrics) {
-								metrics.jobs.keySet().retainAll(toRetain);
-							}
-						}
-					},
-					executor);
-
-				String jobManagerPath = jobManagerGateway.getAddress();
-				String jmQueryServicePath = jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME;
-
-				retrieveAndQueryMetrics(jmQueryServicePath);
-
-				/**
-				 * We first request the list of all registered task managers from the job manager, and then
-				 * request the respective metric dump from each task manager.
-				 *
-				 * <p>All stored metrics that do not belong to a registered task manager will be removed.
-				 */
-				CompletableFuture<Collection<Instance>> taskManagersFuture = jobManagerGateway.requestTaskManagerInstances(timeout);
-
-				taskManagersFuture.whenCompleteAsync(
-					(Collection<Instance> taskManagers, Throwable throwable) -> {
-						if (throwable != null) {
-							LOG.debug("Fetching list of registered TaskManagers failed.", throwable);
-						} else {
-							List<String> activeTaskManagers = taskManagers.stream().map(
-								taskManagerInstance -> {
-									final String taskManagerAddress = taskManagerInstance.getTaskManagerGateway().getAddress();
-									final String tmQueryServicePath = taskManagerAddress.substring(0, taskManagerAddress.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + taskManagerInstance.getTaskManagerID().getResourceIdString();
-
-									retrieveAndQueryMetrics(tmQueryServicePath);
-
-									return taskManagerInstance.getId().toString();
-								}).collect(Collectors.toList());
-
-							synchronized (metrics) {
-								metrics.taskManagers.keySet().retainAll(activeTaskManagers);
-							}
-						}
-					},
-					executor);
-			}
-		} catch (Exception e) {
-			LOG.warn("Exception while fetching metrics.", e);
-		}
-	}
-
-	/**
-	 * Retrieves and queries the specified QueryServiceGateway.
-	 *
-	 * @param queryServicePath specifying the QueryServiceGateway
-	 */
-	private void retrieveAndQueryMetrics(String queryServicePath) {
-		final CompletableFuture<MetricQueryServiceGateway> queryServiceGatewayFuture = queryServiceRetriever.retrieveService(queryServicePath);
-
-		queryServiceGatewayFuture.whenCompleteAsync(
-			(MetricQueryServiceGateway queryServiceGateway, Throwable t) -> {
-				if (t != null) {
-					LOG.debug("Could not retrieve QueryServiceGateway.", t);
-				} else {
-					queryMetrics(queryServiceGateway);
-				}
-			},
-			executor);
-	}
-
-	/**
-	 * Query the metrics from the given QueryServiceGateway.
-	 *
-	 * @param queryServiceGateway to query for metrics
-	 */
-	private void queryMetrics(final MetricQueryServiceGateway queryServiceGateway) {
-		queryServiceGateway
-			.queryMetrics(timeout)
-			.whenCompleteAsync(
-				(MetricDumpSerialization.MetricSerializationResult result, Throwable t) -> {
-					if (t != null) {
-						LOG.debug("Fetching metrics failed.", t);
-					} else {
-						List<MetricDump> dumpedMetrics = deserializer.deserialize(result);
-						synchronized (metrics) {
-							for (MetricDump metric : dumpedMetrics) {
-								metrics.add(metric);
-							}
-						}
-					}
-				},
-				executor);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
deleted file mode 100644
index e36dca8..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
+++ /dev/null
@@ -1,305 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.metrics;
-
-import org.apache.flink.runtime.metrics.dump.MetricDump;
-import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_COUNTER;
-import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_GAUGE;
-import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_HISTOGRAM;
-import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_METER;
-import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_JM;
-import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_JOB;
-import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_OPERATOR;
-import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TASK;
-import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TM;
-
-/**
- * Nested data-structure to store metrics.
- *
- * <p>This structure is not thread-safe.
- */
-public class MetricStore {
-	private static final Logger LOG = LoggerFactory.getLogger(MetricStore.class);
-
-	final JobManagerMetricStore jobManager = new JobManagerMetricStore();
-	final Map<String, TaskManagerMetricStore> taskManagers = new HashMap<>();
-	final Map<String, JobMetricStore> jobs = new HashMap<>();
-
-	// -----------------------------------------------------------------------------------------------------------------
-	// Adding metrics
-	// -----------------------------------------------------------------------------------------------------------------
-	public void add(MetricDump metric) {
-		try {
-			QueryScopeInfo info = metric.scopeInfo;
-			TaskManagerMetricStore tm;
-			JobMetricStore job;
-			TaskMetricStore task;
-			SubtaskMetricStore subtask;
-
-			String name = info.scope.isEmpty()
-				? metric.name
-				: info.scope + "." + metric.name;
-
-			if (name.isEmpty()) { // malformed transmission
-				return;
-			}
-
-			switch (info.getCategory()) {
-				case INFO_CATEGORY_JM:
-					addMetric(jobManager.metrics, name, metric);
-					break;
-				case INFO_CATEGORY_TM:
-					String tmID = ((QueryScopeInfo.TaskManagerQueryScopeInfo) info).taskManagerID;
-					tm = taskManagers.get(tmID);
-					if (tm == null) {
-						tm = new TaskManagerMetricStore();
-						taskManagers.put(tmID, tm);
-					}
-					if (name.contains("GarbageCollector")) {
-						String gcName = name.substring("Status.JVM.GarbageCollector.".length(), name.lastIndexOf('.'));
-						tm.addGarbageCollectorName(gcName);
-					}
-					addMetric(tm.metrics, name, metric);
-					break;
-				case INFO_CATEGORY_JOB:
-					QueryScopeInfo.JobQueryScopeInfo jobInfo = (QueryScopeInfo.JobQueryScopeInfo) info;
-					job = jobs.get(jobInfo.jobID);
-					if (job == null) {
-						job = new JobMetricStore();
-						jobs.put(jobInfo.jobID, job);
-					}
-					addMetric(job.metrics, name, metric);
-					break;
-				case INFO_CATEGORY_TASK:
-					QueryScopeInfo.TaskQueryScopeInfo taskInfo = (QueryScopeInfo.TaskQueryScopeInfo) info;
-					job = jobs.get(taskInfo.jobID);
-					if (job == null) {
-						job = new JobMetricStore();
-						jobs.put(taskInfo.jobID, job);
-					}
-					task = job.tasks.get(taskInfo.vertexID);
-					if (task == null) {
-						task = new TaskMetricStore();
-						job.tasks.put(taskInfo.vertexID, task);
-					}
-					subtask = task.subtasks.get(taskInfo.subtaskIndex);
-					if (subtask == null) {
-						subtask = new SubtaskMetricStore();
-						task.subtasks.put(taskInfo.subtaskIndex, subtask);
-					}
-					/**
-					 * The duplication is intended. Metrics scoped by subtask are useful for several job/task handlers,
-					 * while the WebInterface task metric queries currently do not account for subtasks, so we don't
-					 * divide by subtask and instead use the concatenation of subtask index and metric name as the name
-					 * for those.
-					 */
-					addMetric(subtask.metrics, name, metric);
-					addMetric(task.metrics, taskInfo.subtaskIndex + "." + name, metric);
-					break;
-				case INFO_CATEGORY_OPERATOR:
-					QueryScopeInfo.OperatorQueryScopeInfo operatorInfo = (QueryScopeInfo.OperatorQueryScopeInfo) info;
-					job = jobs.get(operatorInfo.jobID);
-					if (job == null) {
-						job = new JobMetricStore();
-						jobs.put(operatorInfo.jobID, job);
-					}
-					task = job.tasks.get(operatorInfo.vertexID);
-					if (task == null) {
-						task = new TaskMetricStore();
-						job.tasks.put(operatorInfo.vertexID, task);
-					}
-					/**
-					 * As the WebInterface does not account for operators (because it can't) we don't
-					 * divide by operator and instead use the concatenation of subtask index, operator name and metric name
-					 * as the name.
-					 */
-					addMetric(task.metrics, operatorInfo.subtaskIndex + "." + operatorInfo.operatorName + "." + name, metric);
-					break;
-				default:
-					LOG.debug("Invalid metric dump category: " + info.getCategory());
-			}
-		} catch (Exception e) {
-			LOG.debug("Malformed metric dump.", e);
-		}
-	}
-
-	private void addMetric(Map<String, String> target, String name, MetricDump metric) {
-		switch (metric.getCategory()) {
-			case METRIC_CATEGORY_COUNTER:
-				MetricDump.CounterDump counter = (MetricDump.CounterDump) metric;
-				target.put(name, String.valueOf(counter.count));
-				break;
-			case METRIC_CATEGORY_GAUGE:
-				MetricDump.GaugeDump gauge = (MetricDump.GaugeDump) metric;
-				target.put(name, gauge.value);
-				break;
-			case METRIC_CATEGORY_HISTOGRAM:
-				MetricDump.HistogramDump histogram = (MetricDump.HistogramDump) metric;
-				target.put(name + "_min", String.valueOf(histogram.min));
-				target.put(name + "_max", String.valueOf(histogram.max));
-				target.put(name + "_mean", String.valueOf(histogram.mean));
-				target.put(name + "_median", String.valueOf(histogram.median));
-				target.put(name + "_stddev", String.valueOf(histogram.stddev));
-				target.put(name + "_p75", String.valueOf(histogram.p75));
-				target.put(name + "_p90", String.valueOf(histogram.p90));
-				target.put(name + "_p95", String.valueOf(histogram.p95));
-				target.put(name + "_p98", String.valueOf(histogram.p98));
-				target.put(name + "_p99", String.valueOf(histogram.p99));
-				target.put(name + "_p999", String.valueOf(histogram.p999));
-				break;
-			case METRIC_CATEGORY_METER:
-				MetricDump.MeterDump meter = (MetricDump.MeterDump) metric;
-				target.put(name, String.valueOf(meter.rate));
-				break;
-		}
-	}
-
-	// -----------------------------------------------------------------------------------------------------------------
-	// Accessors for sub MetricStores
-	// -----------------------------------------------------------------------------------------------------------------
-
-	/**
-	 * Returns the {@link JobManagerMetricStore}.
-	 *
-	 * @return JobManagerMetricStore
-	 */
-	public JobManagerMetricStore getJobManagerMetricStore() {
-		return jobManager;
-	}
-
-	/**
-	 * Returns the {@link TaskManagerMetricStore} for the given taskmanager ID.
-	 *
-	 * @param tmID taskmanager ID
-	 * @return TaskManagerMetricStore for the given ID, or null if no store for the given argument exists
-	 */
-	public TaskManagerMetricStore getTaskManagerMetricStore(String tmID) {
-		return taskManagers.get(tmID);
-	}
-
-	/**
-	 * Returns the {@link JobMetricStore} for the given job ID.
-	 *
-	 * @param jobID job ID
-	 * @return JobMetricStore for the given ID, or null if no store for the given argument exists
-	 */
-	public JobMetricStore getJobMetricStore(String jobID) {
-		return jobs.get(jobID);
-	}
-
-	/**
-	 * Returns the {@link TaskMetricStore} for the given job/task ID.
-	 *
-	 * @param jobID  job ID
-	 * @param taskID task ID
-	 * @return TaskMetricStore for given IDs, or null if no store for the given arguments exists
-	 */
-	public TaskMetricStore getTaskMetricStore(String jobID, String taskID) {
-		JobMetricStore job = getJobMetricStore(jobID);
-		if (job == null) {
-			return null;
-		}
-		return job.getTaskMetricStore(taskID);
-	}
-
-	/**
-	 * Returns the {@link SubtaskMetricStore} for the given job/task ID and subtask index.
-	 *
-	 * @param jobID        job ID
-	 * @param taskID       task ID
-	 * @param subtaskIndex subtask index
-	 * @return SubtaskMetricStore for the given IDs and index, or null if no store for the given arguments exists
-	 */
-	public SubtaskMetricStore getSubtaskMetricStore(String jobID, String taskID, int subtaskIndex) {
-		TaskMetricStore task = getTaskMetricStore(jobID, taskID);
-		if (task == null) {
-			return null;
-		}
-		return task.getSubtaskMetricStore(subtaskIndex);
-	}
-
-	// -----------------------------------------------------------------------------------------------------------------
-	// sub MetricStore classes
-	// -----------------------------------------------------------------------------------------------------------------
-	private abstract static class ComponentMetricStore {
-		public final Map<String, String> metrics = new HashMap<>();
-
-		public String getMetric(String name, String defaultValue) {
-			String value = this.metrics.get(name);
-			return value != null
-				? value
-				: defaultValue;
-		}
-	}
-
-	/**
-	 * Sub-structure containing metrics of the JobManager.
-	 */
-	public static class JobManagerMetricStore extends ComponentMetricStore {
-	}
-
-	/**
-	 * Sub-structure containing metrics of a single TaskManager.
-	 */
-	public static class TaskManagerMetricStore extends ComponentMetricStore {
-		public final Set<String> garbageCollectorNames = new HashSet<>();
-
-		public void addGarbageCollectorName(String name) {
-			garbageCollectorNames.add(name);
-		}
-	}
-
-	/**
-	 * Sub-structure containing metrics of a single Job.
-	 */
-	public static class JobMetricStore extends ComponentMetricStore {
-		private final Map<String, TaskMetricStore> tasks = new HashMap<>();
-
-		public TaskMetricStore getTaskMetricStore(String taskID) {
-			return tasks.get(taskID);
-		}
-	}
-
-	/**
-	 * Sub-structure containing metrics of a single Task.
-	 */
-	public static class TaskMetricStore extends ComponentMetricStore {
-		private final Map<Integer, SubtaskMetricStore> subtasks = new HashMap<>();
-
-		public SubtaskMetricStore getSubtaskMetricStore(int subtaskIndex) {
-			return subtasks.get(subtaskIndex);
-		}
-	}
-
-	/**
-	 * Sub-structure containing metrics of a single Subtask.
-	 */
-	public static class SubtaskMetricStore extends ComponentMetricStore {
-	}
-}


Mime
View raw message