flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject [3/4] flink git commit: [FLINK-4389] Expose metrics to WebFrontend
Date Thu, 15 Sep 2016 17:18:40 GMT
[FLINK-4389] Expose metrics to WebFrontend

This closes #2363


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/70704de0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/70704de0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/70704de0

Branch: refs/heads/master
Commit: 70704de0c82cbb7b143dd696221e11999feb3600
Parents: 545b72b
Author: zentol <chesnay@apache.org>
Authored: Fri Aug 5 13:54:37 2016 +0200
Committer: zentol <chesnay@apache.org>
Committed: Thu Sep 15 19:17:52 2016 +0200

----------------------------------------------------------------------
 .../clusterframework/MesosTaskManager.scala     |   7 +-
 .../flink/metrics/jmx/JMXReporterTest.java      |  56 +---
 .../runtime/webmonitor/WebRuntimeMonitor.java   |  14 +
 .../metrics/AbstractMetricsHandler.java         | 124 ++++++++
 .../metrics/JobManagerMetricsHandler.java       |  47 +++
 .../webmonitor/metrics/JobMetricsHandler.java   |  49 +++
 .../metrics/JobVertexMetricsHandler.java        |  54 ++++
 .../webmonitor/metrics/MetricFetcher.java       | 224 ++++++++++++++
 .../runtime/webmonitor/metrics/MetricStore.java | 190 ++++++++++++
 .../metrics/TaskManagerMetricsHandler.java      |  49 +++
 .../metrics/AbstractMetricsHandlerTest.java     | 154 ++++++++++
 .../metrics/JobManagerMetricsHandlerTest.java   |  61 ++++
 .../metrics/JobMetricsHandlerTest.java          |  63 ++++
 .../metrics/JobVertexMetricsHandlerTest.java    |  67 ++++
 .../webmonitor/metrics/MetricFetcherTest.java   | 216 +++++++++++++
 .../webmonitor/metrics/MetricStoreTest.java     |  83 +++++
 .../metrics/TaskManagerMetricsHandlerTest.java  |  63 ++++
 .../flink/runtime/metrics/MetricRegistry.java   |  24 ++
 .../flink/runtime/metrics/dump/MetricDump.java  | 138 +++++++++
 .../metrics/dump/MetricDumpSerialization.java   | 302 +++++++++++++++++++
 .../metrics/dump/MetricQueryService.java        | 217 +++++++++++++
 .../runtime/metrics/dump/QueryScopeInfo.java    | 189 ++++++++++++
 .../metrics/groups/AbstractMetricGroup.java     |  25 ++
 .../metrics/groups/GenericMetricGroup.java      |  10 +
 .../metrics/groups/JobManagerMetricGroup.java   |   7 +
 .../runtime/metrics/groups/JobMetricGroup.java  |   7 +
 .../metrics/groups/OperatorMetricGroup.java     |  11 +
 .../metrics/groups/TaskManagerMetricGroup.java  |   7 +
 .../runtime/metrics/groups/TaskMetricGroup.java |  14 +-
 .../flink/runtime/jobmanager/JobManager.scala   |   6 +
 .../minicluster/LocalFlinkMiniCluster.scala     |  14 +-
 .../flink/runtime/taskmanager/TaskManager.scala |  43 +--
 .../metrics/dump/MetricDumpSerializerTest.java  | 178 +++++++++++
 .../runtime/metrics/dump/MetricDumpTest.java    |  86 ++++++
 .../metrics/dump/MetricQueryServiceTest.java    | 133 ++++++++
 .../metrics/dump/QueryScopeInfoTest.java        |  73 +++++
 .../metrics/groups/AbstractMetricGroupTest.java |   6 +
 .../metrics/groups/JobManagerGroupTest.java     |  14 +-
 .../metrics/groups/JobManagerJobGroupTest.java  |  17 +-
 .../runtime/metrics/groups/MetricGroupTest.java |  31 +-
 .../metrics/groups/OperatorGroupTest.java       |  25 +-
 .../metrics/groups/TaskManagerGroupTest.java    |  15 +-
 .../metrics/groups/TaskManagerJobGroupTest.java |  17 +-
 .../metrics/groups/TaskMetricGroupTest.java     |  23 +-
 .../metrics/util/DummyCharacterFilter.java      |  27 ++
 .../runtime/metrics/util/TestingHistogram.java  |  73 +++++
 ...askManagerComponentsStartupShutdownTest.java |   4 +-
 .../testingUtils/TestingTaskManager.scala       |  13 +-
 .../flink/yarn/TestingYarnTaskManager.scala     |   9 +-
 .../org/apache/flink/yarn/YarnTaskManager.scala |   7 +-
 50 files changed, 3186 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
index 19b0c62..3972a57 100644
--- a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
+++ b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager
 import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
 import org.apache.flink.runtime.memory.MemoryManager
+import org.apache.flink.runtime.metrics.MetricRegistry
 import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerConfiguration, TaskManagerLocation}
 
 /** An extension of the TaskManager that listens for additional Mesos-related
@@ -36,7 +37,8 @@ class MesosTaskManager(
     ioManager: IOManager,
     network: NetworkEnvironment,
     numberOfSlots: Int,
-    leaderRetrievalService: LeaderRetrievalService)
+    leaderRetrievalService: LeaderRetrievalService,
+    metricRegistry : MetricRegistry)
   extends TaskManager(
     config,
     resourceID,
@@ -45,7 +47,8 @@ class MesosTaskManager(
     ioManager,
     network,
     numberOfSlots,
-    leaderRetrievalService) {
+    leaderRetrievalService,
+    metricRegistry) {
 
   override def handleMessage: Receive = {
     super.handleMessage

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
index 913999b..089efe3 100644
--- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
+++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
@@ -21,13 +21,12 @@ package org.apache.flink.metrics.jmx;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.Histogram;
-import org.apache.flink.metrics.HistogramStatistics;
 import org.apache.flink.metrics.util.TestMeter;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.util.TestReporter;
+import org.apache.flink.runtime.metrics.util.TestingHistogram;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
@@ -286,57 +285,4 @@ public class JMXReporterTest extends TestLogger {
 			}
 		}
 	}
-
-	static class TestingHistogram implements Histogram {
-
-		@Override
-		public void update(long value) {
-
-		}
-
-		@Override
-		public long getCount() {
-			return 1;
-		}
-
-		@Override
-		public HistogramStatistics getStatistics() {
-			return new HistogramStatistics() {
-				@Override
-				public double getQuantile(double quantile) {
-					return quantile;
-				}
-
-				@Override
-				public long[] getValues() {
-					return new long[0];
-				}
-
-				@Override
-				public int size() {
-					return 3;
-				}
-
-				@Override
-				public double getMean() {
-					return 4;
-				}
-
-				@Override
-				public double getStdDev() {
-					return 5;
-				}
-
-				@Override
-				public long getMax() {
-					return 6;
-				}
-
-				@Override
-				public long getMin() {
-					return 7;
-				}
-			};
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 2bfbb85..4dd36e7 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -67,6 +67,11 @@ import org.apache.flink.runtime.webmonitor.handlers.SubtasksAllAccumulatorsHandl
 import org.apache.flink.runtime.webmonitor.handlers.SubtasksTimesHandler;
 import org.apache.flink.runtime.webmonitor.handlers.TaskManagerLogHandler;
 import org.apache.flink.runtime.webmonitor.handlers.TaskManagersHandler;
+import org.apache.flink.runtime.webmonitor.metrics.JobManagerMetricsHandler;
+import org.apache.flink.runtime.webmonitor.metrics.JobMetricsHandler;
+import org.apache.flink.runtime.webmonitor.metrics.JobVertexMetricsHandler;
+import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
+import org.apache.flink.runtime.webmonitor.metrics.TaskManagerMetricsHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.ExecutionContext$;
@@ -133,6 +138,8 @@ public class WebRuntimeMonitor implements WebMonitor {
 
 	private ExecutorService executorService;
 
+	private MetricFetcher metricFetcher;
+
 	public WebRuntimeMonitor(
 			Configuration config,
 			LeaderRetrievalService leaderRetrievalService,
@@ -206,6 +213,8 @@ public class WebRuntimeMonitor implements WebMonitor {
 
 		ExecutionContextExecutor context = ExecutionContext$.MODULE$.fromExecutor(executorService);
 
+		metricFetcher = new MetricFetcher(actorSystem, retriever, context);
+
 		router = new Router()
 			// config how to interact with this web server
 			.GET("/config", handler(new DashboardConfigHandler(cfg.getRefreshInterval())))
@@ -235,6 +244,7 @@ public class WebRuntimeMonitor implements WebMonitor {
 							currentGraphs,
 							backPressureStatsTracker,
 							refreshInterval)))
+			.GET("/jobs/:jobid/vertices/:vertexid/metrics", handler(new JobVertexMetricsHandler(metricFetcher)))
 			.GET("/jobs/:jobid/vertices/:vertexid/subtasks/accumulators", handler(new SubtasksAllAccumulatorsHandler(currentGraphs)))
 			.GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum", handler(new SubtaskCurrentAttemptDetailsHandler(currentGraphs)))
 			.GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt", handler(new SubtaskExecutionAttemptDetailsHandler(currentGraphs)))
@@ -245,6 +255,7 @@ public class WebRuntimeMonitor implements WebMonitor {
 			.GET("/jobs/:jobid/exceptions", handler(new JobExceptionsHandler(currentGraphs)))
 			.GET("/jobs/:jobid/accumulators", handler(new JobAccumulatorsHandler(currentGraphs)))
 			.GET("/jobs/:jobid/checkpoints", handler(new JobCheckpointsHandler(currentGraphs)))
+			.GET("/jobs/:jobid/metrics", handler(new JobMetricsHandler(metricFetcher)))
 
 			.GET("/taskmanagers", handler(new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT)))
 			.GET("/taskmanagers/:" + TaskManagersHandler.TASK_MANAGER_ID_KEY + "/metrics", handler(new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT)))
@@ -252,6 +263,7 @@ public class WebRuntimeMonitor implements WebMonitor {
 				new TaskManagerLogHandler(retriever, context, jobManagerAddressPromise.future(), timeout, TaskManagerLogHandler.FileMode.LOG, config))
 			.GET("/taskmanagers/:" + TaskManagersHandler.TASK_MANAGER_ID_KEY + "/stdout", 
 				new TaskManagerLogHandler(retriever, context, jobManagerAddressPromise.future(), timeout, TaskManagerLogHandler.FileMode.STDOUT, config))
+			.GET("/taskmanagers/:" + TaskManagersHandler.TASK_MANAGER_ID_KEY + "/metrics", handler(new TaskManagerMetricsHandler(metricFetcher)))
 
 			// log and stdout
 			.GET("/jobmanager/log", logFiles.logFile == null ? new ConstantTextHandler("(log file unavailable)") :
@@ -260,6 +272,8 @@ public class WebRuntimeMonitor implements WebMonitor {
 			.GET("/jobmanager/stdout", logFiles.stdOutFile == null ? new ConstantTextHandler("(stdout file unavailable)") :
 				new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, logFiles.stdOutFile))
 
+			.GET("/jobmanager/metrics", handler(new JobManagerMetricsHandler(metricFetcher)))
+
 			// Cancel a job via GET (for proper integration with YARN this has to be performed via GET)
 			.GET("/jobs/:jobid/yarn-cancel", handler(new JobCancellationHandler()))
 

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
new file mode 100644
index 0000000..54e4b6f
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.metrics;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.webmonitor.handlers.JsonFactory;
+import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Map;
+
+/**
+ * Abstract request handler that returns a list of all available metrics or the values for a set of metrics.
+ *
+ * If the query parameters do not contain a "get" parameter the list of all metrics is returned.
+ * {@code [ { "id" : "X" } ] }
+ *
+ * If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value.
+ * {@code /get?X,Y}
+ * The handler will then return a list containing the values of the requested metrics.
+ * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
+ */
+public abstract class AbstractMetricsHandler implements RequestHandler {
+	private final MetricFetcher fetcher;
+
+	public AbstractMetricsHandler(MetricFetcher fetcher) {
+		this.fetcher = Preconditions.checkNotNull(fetcher);
+	}
+
+	@Override
+	public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+		fetcher.update();
+		String requestedMetricsList = queryParams.get("get");
+		return requestedMetricsList != null
+			? getMetricsValues(pathParams, requestedMetricsList)
+			: getAvailableMetricsList(pathParams);
+	}
+
+	/**
+	 * Returns a Map containing the metrics belonging to the entity pointed to by the path parameters.
+	 *
+	 * @param pathParams REST path parameters
+	 * @param metrics MetricStore containing all metrics
+	 * @return Map containing metrics, or null if no metric exists
+	 */
+	protected abstract Map<String, Object> getMapFor(Map<String, String> pathParams, MetricStore metrics);
+
+	private String getMetricsValues(Map<String, String> pathParams, String requestedMetricsList) throws IOException {
+		if (requestedMetricsList.isEmpty()) {
+			/**
+			 * The WebInterface doesn't check whether the list of available metrics was empty. This can lead to a 
+			 * request for which the "get" parameter is an empty string.
+			 */
+			return "";
+		}
+		MetricStore metricStore = fetcher.getMetricStore();
+		synchronized (metricStore) {
+			Map<String, Object> metrics = getMapFor(pathParams, metricStore);
+			if (metrics == null) {
+				return "";
+			}
+			String[] requestedMetrics = requestedMetricsList.split(",");
+
+			StringWriter writer = new StringWriter();
+			JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
+
+			gen.writeStartArray();
+			for (String requestedMetric : requestedMetrics) {
+				Object metricValue = metrics.get(requestedMetric);
+				if (metricValue != null) {
+					gen.writeStartObject();
+					gen.writeStringField("id", requestedMetric);
+					gen.writeStringField("value", metricValue.toString());
+					gen.writeEndObject();
+				}
+			}
+			gen.writeEndArray();
+
+			gen.close();
+			return writer.toString();
+		}
+	}
+
+	private String getAvailableMetricsList(Map<String, String> pathParams) throws IOException {
+		MetricStore metricStore = fetcher.getMetricStore();
+		synchronized (metricStore) {
+			Map<String, Object> metrics = getMapFor(pathParams, metricStore);
+			if (metrics == null) {
+				return "";
+			}
+			StringWriter writer = new StringWriter();
+			JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
+
+			gen.writeStartArray();
+			for (String m : metrics.keySet()) {
+				gen.writeStartObject();
+				gen.writeStringField("id", m);
+				gen.writeEndObject();
+			}
+			gen.writeEndArray();
+
+			gen.close();
+			return writer.toString();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
new file mode 100644
index 0000000..7435643
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.metrics;
+
+import java.util.Map;
+
+/**
+ * Request handler that returns for the job manager a list of all available metrics or the values for a set of metrics.
+ *
+ * If the query parameters do not contain a "get" parameter the list of all metrics is returned.
+ * {@code {"available": [ { "name" : "X", "id" : "X" } ] } }
+ *
+ * If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value.
+ * {@code /get?X,Y}
+ * The handler will then return a list containing the values of the requested metrics.
+ * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
+ */
+public class JobManagerMetricsHandler extends AbstractMetricsHandler {
+	public JobManagerMetricsHandler(MetricFetcher fetcher) {
+		super(fetcher);
+	}
+
+	@Override
+	protected Map<String, Object> getMapFor(Map<String, String> pathParams, MetricStore metrics) {
+		MetricStore.JobManagerMetricStore jobManager = metrics.jobManager;
+		if (jobManager == null) {
+			return null;
+		} else {
+			return jobManager.metrics;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
new file mode 100644
index 0000000..b54799d
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.metrics;
+
+import java.util.Map;
+
+/**
+ * Request handler that returns for a given job a list of all available metrics or the values for a set of metrics.
+ *
+ * If the query parameters do not contain a "get" parameter the list of all metrics is returned.
+ * {@code {"available": [ { "name" : "X", "id" : "X" } ] } }
+ *
+ * If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value.
+ * {@code /get?X,Y}
+ * The handler will then return a list containing the values of the requested metrics.
+ * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
+ */
+public class JobMetricsHandler extends AbstractMetricsHandler {
+	public static final String PARAMETER_JOB_ID = "jobid";
+
+	public JobMetricsHandler(MetricFetcher fetcher) {
+		super(fetcher);
+	}
+
+	@Override
+	protected Map<String, Object> getMapFor(Map<String, String> pathParams, MetricStore metrics) {
+		MetricStore.JobMetricStore job = metrics.jobs.get(pathParams.get(PARAMETER_JOB_ID));
+		if (job == null) {
+			return null;
+		} else {
+			return job.metrics;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
new file mode 100644
index 0000000..73b8bb0
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.metrics;
+
+import java.util.Map;
+
+/**
+ * Request handler that returns for a given task a list of all available metrics or the values for a set of metrics.
+ *
+ * If the query parameters do not contain a "get" parameter the list of all metrics is returned.
+ * {@code {"available": [ { "name" : "X", "id" : "X" } ] } }
+ *
+ * If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value.
+ * {@code /get?X,Y}
+ * The handler will then return a list containing the values of the requested metrics.
+ * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
+ */
+public class JobVertexMetricsHandler extends AbstractMetricsHandler {
+	public static final String PARAMETER_VERTEX_ID = "vertexid";
+
+	public JobVertexMetricsHandler(MetricFetcher fetcher) {
+		super(fetcher);
+	}
+
+	@Override
+	protected Map<String, Object> getMapFor(Map<String, String> pathParams, MetricStore metrics) {
+		MetricStore.JobMetricStore job = metrics.jobs.get(pathParams.get(JobMetricsHandler.PARAMETER_JOB_ID));
+		if (job == null) {
+			return null;
+		} else {
+			MetricStore.TaskMetricStore task = job.tasks.get(pathParams.get(PARAMETER_VERTEX_ID));
+			if (task == null) {
+				return null;
+			} else {
+				return task.metrics;
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
new file mode 100644
index 0000000..7a39a53
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.metrics;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.dispatch.OnFailure;
+import akka.dispatch.OnSuccess;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
+import org.apache.flink.runtime.metrics.dump.MetricQueryService;
+import org.apache.flink.runtime.metrics.dump.MetricDump;
+import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.MetricDumpDeserializer;
+
+/**
+ * The MetricFetcher can be used to fetch metrics from the JobManager and all registered TaskManagers.
+ *
+ * Metrics will only be fetched when {@link MetricFetcher#update()} is called, provided that a sufficient time since
+ * the last call has passed.
+ */
+public class MetricFetcher {
+	private static final Logger LOG = LoggerFactory.getLogger(MetricFetcher.class);
+
+	private final ActorSystem actorSystem;
+	private final JobManagerRetriever retriever;
+	private final ExecutionContext ctx;
+	private final FiniteDuration timeout = new FiniteDuration(Duration.create(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT).toMillis(), TimeUnit.MILLISECONDS);
+
+	private MetricStore metrics = new MetricStore();
+	private MetricDumpDeserializer deserializer = new MetricDumpDeserializer();
+
+	private long lastUpdateTime;
+
+	public MetricFetcher(ActorSystem actorSystem, JobManagerRetriever retriever, ExecutionContext ctx) {
+		this.actorSystem = Preconditions.checkNotNull(actorSystem);
+		this.retriever = Preconditions.checkNotNull(retriever);
+		this.ctx = Preconditions.checkNotNull(ctx);
+	}
+
+	/**
+	 * Returns the MetricStore containing all stored metrics.
+	 *
+	 * @return MetricStore containing all stored metrics;
+	 */
+	public MetricStore getMetricStore() {
+		return metrics;
+	}
+
+	/**
+	 * This method can be used to signal this MetricFetcher that the metrics are still in use and should be updated.
+	 */
+	public void update() {
+		synchronized (this) {
+			long currentTime = System.currentTimeMillis();
+			if (currentTime - lastUpdateTime > 10000) { // 10 seconds have passed since the last update
+				lastUpdateTime = currentTime;
+				fetchMetrics();
+			}
+		}
+	}
+
+	private void fetchMetrics() {
+		try {
+			Option<scala.Tuple2<ActorGateway, Integer>> jobManagerGatewayAndWebPort = retriever.getJobManagerGatewayAndWebPort();
+			if (jobManagerGatewayAndWebPort.isDefined()) {
+				ActorGateway jobManager = jobManagerGatewayAndWebPort.get()._1();
+
+				/**
+				 * Remove all metrics that belong to a job that is not running and no longer archived.
+				 */
+				Future<Object> jobDetailsFuture = jobManager.ask(new RequestJobDetails(true, true), timeout);
+				jobDetailsFuture
+					.onSuccess(new OnSuccess<Object>() {
+						@Override
+						public void onSuccess(Object result) throws Throwable {
+							MultipleJobsDetails details = (MultipleJobsDetails) result;
+							ArrayList<String> toRetain = new ArrayList<>();
+							for (JobDetails job : details.getRunningJobs()) {
+								toRetain.add(job.getJobId().toString());
+							}
+							for (JobDetails job : details.getFinishedJobs()) {
+								toRetain.add(job.getJobId().toString());
+							}
+							synchronized (metrics) {
+								metrics.jobs.keySet().retainAll(toRetain);
+							}
+						}
+					}, ctx);
+				logErrorOnFailure(jobDetailsFuture, "Fetching of JobDetails failed.");
+
+				String jobManagerPath = jobManager.path();
+				String queryServicePath = jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + "MetricQueryService";
+				ActorRef jobManagerQueryService = actorSystem.actorFor(queryServicePath);
+
+				queryMetrics(jobManagerQueryService);
+
+				/**
+				 * We first request the list of all registered task managers from the job manager, and then
+				 * request the respective metric dump from each task manager.
+				 *
+				 * All stored metrics that do not belong to a registered task manager will be removed.
+				 */
+				Future<Object> registeredTaskManagersFuture = jobManager.ask(JobManagerMessages.getRequestRegisteredTaskManagers(), timeout);
+				registeredTaskManagersFuture
+					.onSuccess(new OnSuccess<Object>() {
+						@Override
+						public void onSuccess(Object result) throws Throwable {
+							Iterable<Instance> taskManagers = ((JobManagerMessages.RegisteredTaskManagers) result).asJavaIterable();
+							List<String> activeTaskManagers = new ArrayList<>();
+							for (Instance taskManager : taskManagers) {
+								activeTaskManagers.add(taskManager.getId().toString());
+
+								String taskManagerPath = taskManager.getActorGateway().path();
+								String queryServicePath = taskManagerPath.substring(0, taskManagerPath.lastIndexOf('/') + 1) + "MetricQueryService";
+								ActorRef taskManagerQueryService = actorSystem.actorFor(queryServicePath);
+
+								queryMetrics(taskManagerQueryService);
+							}
+							synchronized (metrics) { // remove all metrics belonging to unregistered task managers
+								metrics.taskManagers.keySet().retainAll(activeTaskManagers);
+							}
+						}
+					}, ctx);
+				logErrorOnFailure(registeredTaskManagersFuture, "Fetchin list of registered TaskManagers failed.");
+			}
+		} catch (Exception e) {
+			LOG.warn("Exception while fetching metrics.", e);
+		}
+	}
+
+	private void logErrorOnFailure(Future<Object> future, final String message) {
+		future.onFailure(new OnFailure() {
+			@Override
+			public void onFailure(Throwable failure) throws Throwable {
+				LOG.warn(message, failure);
+			}
+		}, ctx);
+	}
+
+	/**
+	 * Requests a metric dump from the given actor.
+	 *
+	 * @param actor ActorRef to request the dump from
+     */
+	private void queryMetrics(ActorRef actor) {
+		Future<Object> metricQueryFuture = new BasicGateway(actor).ask(MetricQueryService.getCreateDump(), timeout);
+		metricQueryFuture
+			.onSuccess(new OnSuccess<Object>() {
+				@Override
+				public void onSuccess(Object result) throws Throwable {
+					addMetrics(result);
+				}
+			}, ctx);
+		logErrorOnFailure(metricQueryFuture, "Fetching metrics failed.");
+	}
+
+	private void addMetrics(Object result) throws IOException {
+		byte[] data = (byte[]) result;
+		List<MetricDump> dumpedMetrics = deserializer.deserialize(data);
+		for (MetricDump metric : dumpedMetrics) {
+			metrics.add(metric);
+		}
+	}
+
+	/**
+	 * Helper class that allows mocking of the answer.
+     */
+	static class BasicGateway {
+		private final ActorRef actor;
+
+		private BasicGateway(ActorRef actor) {
+			this.actor = actor;
+		}
+
+		/**
+		 * Sends a message asynchronously and returns its response. The response to the message is
+		 * returned as a future.
+		 *
+		 * @param message Message to be sent
+		 * @param timeout Timeout until the Future is completed with an AskTimeoutException
+		 * @return Future which contains the response to the sent message
+		 */
+		public Future<Object> ask(Object message, FiniteDuration timeout) {
+			return Patterns.ask(actor, message, new Timeout(timeout));
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
new file mode 100644
index 0000000..41e68cc
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.metrics;
+
+import org.apache.flink.runtime.metrics.dump.MetricDump;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_COUNTER;
+import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_GAUGE;
+import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_HISTOGRAM;
+import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_METER;
+import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_JM;
+import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_JOB;
+import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_OPERATOR;
+import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TASK;
+import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TM;
+
+/**
+ * Nested data-structure to store metrics.
+ *
+ * This structure is not thread-safe.
+ */
+public class MetricStore {
+	private static final Logger LOG = LoggerFactory.getLogger(MetricStore.class);
+
+	final JobManagerMetricStore jobManager = new JobManagerMetricStore();
+	final Map<String, TaskManagerMetricStore> taskManagers = new HashMap<>();
+	final Map<String, JobMetricStore> jobs = new HashMap<>();
+
+	public void add(MetricDump metric) {
+		try {
+			QueryScopeInfo info = metric.scopeInfo;
+			TaskManagerMetricStore tm;
+			JobMetricStore job;
+			TaskMetricStore task;
+
+			String name = info.scope.isEmpty()
+				? metric.name
+				: info.scope + "." + metric.name;
+			
+			if (name.isEmpty()) { // malformed transmission
+				return;
+			}
+
+			switch (info.getCategory()) {
+				case INFO_CATEGORY_JM:
+					addMetric(jobManager.metrics, name, metric);
+				case INFO_CATEGORY_TM:
+					String tmID = ((QueryScopeInfo.TaskManagerQueryScopeInfo) info).taskManagerID;
+					tm = taskManagers.get(tmID);
+					if (tm == null) {
+						tm = new TaskManagerMetricStore();
+						taskManagers.put(tmID, tm);
+					}
+					addMetric(tm.metrics, name, metric);
+					break;
+				case INFO_CATEGORY_JOB:
+					QueryScopeInfo.JobQueryScopeInfo jobInfo = (QueryScopeInfo.JobQueryScopeInfo) info;
+					job = jobs.get(jobInfo.jobID);
+					if (job == null) {
+						job = new JobMetricStore();
+						jobs.put(jobInfo.jobID, job);
+					}
+					addMetric(job.metrics, name, metric);
+					break;
+				case INFO_CATEGORY_TASK:
+					QueryScopeInfo.TaskQueryScopeInfo taskInfo = (QueryScopeInfo.TaskQueryScopeInfo) info;
+					job = jobs.get(taskInfo.jobID);
+					if (job == null) {
+						job = new JobMetricStore();
+						jobs.put(taskInfo.jobID, job);
+					}
+					task = job.tasks.get(taskInfo.vertexID);
+					if (task == null) {
+						task = new TaskMetricStore();
+						job.tasks.put(taskInfo.vertexID, task);
+					}
+					/**
+					 * As the WebInterface task metric queries currently do not account for subtasks we don't 
+					 * divide by subtask and instead use the concatenation of subtask index and metric name as the name. 
+					 */
+					addMetric(task.metrics, taskInfo.subtaskIndex + "." + name, metric);
+					break;
+				case INFO_CATEGORY_OPERATOR:
+					QueryScopeInfo.OperatorQueryScopeInfo operatorInfo = (QueryScopeInfo.OperatorQueryScopeInfo) info;
+					job = jobs.get(operatorInfo.jobID);
+					if (job == null) {
+						job = new JobMetricStore();
+						jobs.put(operatorInfo.jobID, job);
+					}
+					task = job.tasks.get(operatorInfo.vertexID);
+					if (task == null) {
+						task = new TaskMetricStore();
+						job.tasks.put(operatorInfo.vertexID, task);
+					}
+					/**
+					 * As the WebInterface does not account for operators (because it can't) we don't 
+					 * divide by operator and instead use the concatenation of subtask index, operator name and metric name 
+					 * as the name.
+					 */
+					addMetric(task.metrics, operatorInfo.subtaskIndex + "." + operatorInfo.operatorName + "." + name, metric);
+					break;
+				default:
+					LOG.debug("Invalid metric dump category: " + info.getCategory());
+			}
+		} catch (Exception e) {
+			LOG.debug("Malformed metric dump.", e);
+		}
+	}
+
+	private void addMetric(Map<String, Object> target, String name, MetricDump metric) {
+		switch (metric.getCategory()) {
+			case METRIC_CATEGORY_COUNTER:
+				MetricDump.CounterDump counter = (MetricDump.CounterDump) metric;
+				target.put(name, counter.count);
+				break;
+			case METRIC_CATEGORY_GAUGE:
+				MetricDump.GaugeDump gauge = (MetricDump.GaugeDump) metric;
+				target.put(name, gauge.value);
+				break;
+			case METRIC_CATEGORY_HISTOGRAM:
+				MetricDump.HistogramDump histogram = (MetricDump.HistogramDump) metric;
+				target.put(name + "_min", histogram.min);
+				target.put(name + "_max", histogram.max);
+				target.put(name + "_mean", histogram.mean);
+				target.put(name + "_median", histogram.median);
+				target.put(name + "_stddev", histogram.stddev);
+				target.put(name + "_p75", histogram.p75);
+				target.put(name + "_p90", histogram.p90);
+				target.put(name + "_p95", histogram.p95);
+				target.put(name + "_p98", histogram.p98);
+				target.put(name + "_p99", histogram.p99);
+				target.put(name + "_p999", histogram.p999);
+				break;
+			case METRIC_CATEGORY_METER:
+				MetricDump.MeterDump meter = (MetricDump.MeterDump) metric;
+				target.put(name, meter.rate);
+				break;
+		}
+	}
+
+	/**
+	 * Sub-structure containing metrics of the JobManager.
+	 */
+	static class JobManagerMetricStore {
+		public final Map<String, Object> metrics = new HashMap<>();
+	}
+
+	/**
+	 * Sub-structure containing metrics of a single TaskManager.
+	 */
+	static class TaskManagerMetricStore {
+		public final Map<String, Object> metrics = new HashMap<>();
+	}
+
+	/**
+	 * Sub-structure containing metrics of a single Job.
+	 */
+	static class JobMetricStore {
+		public final Map<String, Object> metrics = new HashMap<>();
+		public final Map<String, TaskMetricStore> tasks = new HashMap<>();
+	}
+
+	/**
+	 * Sub-structure containing metrics of a single Task.
+	 */
+	static class TaskMetricStore {
+		public final Map<String, Object> metrics = new HashMap<>();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java
new file mode 100644
index 0000000..fea3d07
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.metrics;
+
+import java.util.Map;
+
+/**
+ * Request handler that returns for a given task manager a list of all available metrics or the values for a set of metrics.
+ *
+ * If the query parameters do not contain a "get" parameter the list of all metrics is returned.
+ * {@code {"available": [ { "name" : "X", "id" : "X" } ] } }
+ *
+ * If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value.
+ * {@code /get?X,Y}
+ * The handler will then return a list containing the values of the requested metrics.
+ * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
+ */
+public class TaskManagerMetricsHandler extends AbstractMetricsHandler {
+	public static final String PARAMETER_TM_ID = "tmid";
+
+	public TaskManagerMetricsHandler(MetricFetcher fetcher) {
+		super(fetcher);
+	}
+
+	@Override
+	protected Map<String, Object> getMapFor(Map<String, String> pathParams, MetricStore metrics) {
+		MetricStore.TaskManagerMetricStore taskManager = metrics.taskManagers.get(pathParams.get(PARAMETER_TM_ID));
+		if (taskManager == null) {
+			return null;
+		} else {
+			return taskManager.metrics;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
new file mode 100644
index 0000000..483dbf6
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.metrics;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import scala.concurrent.ExecutionContext;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.powermock.api.mockito.PowerMockito.mock;
+
+public class AbstractMetricsHandlerTest extends TestLogger {
+	/**
+	 * Verifies that the handlers correctly handle expected REST calls
+	 */
+	@Test
+	public void testHandleRequest() throws Exception {
+		MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class));
+		MetricStoreTest.setupStore(fetcher.getMetricStore());
+
+		JobVertexMetricsHandler handler = new JobVertexMetricsHandler(fetcher);
+
+		Map<String, String> pathParams = new HashMap<>();
+		Map<String, String> queryParams = new HashMap<>();
+
+		pathParams.put("jobid", "jobid");
+		pathParams.put("vertexid", "taskid");
+
+		// get list of available metrics
+		String availableList = handler.handleRequest(pathParams, queryParams, null);
+
+		assertEquals("[" +
+				"{\"id\":\"8.opname.abc.metric5\"}," +
+				"{\"id\":\"8.abc.metric4\"}" +
+				"]",
+			availableList);
+
+		// get value for a single metric
+		queryParams.put("get", "8.opname.abc.metric5");
+
+		String metricValue = handler.handleRequest(pathParams, queryParams, null);
+
+		assertEquals("[" +
+				"{\"id\":\"8.opname.abc.metric5\",\"value\":\"4\"}" +
+				"]"
+			, metricValue
+		);
+
+		// get values for multiple metrics
+		queryParams.put("get", "8.opname.abc.metric5,8.abc.metric4");
+
+		String metricValues = handler.handleRequest(pathParams, queryParams, null);
+
+		assertEquals("[" +
+				"{\"id\":\"8.opname.abc.metric5\",\"value\":\"4\"}," +
+				"{\"id\":\"8.abc.metric4\",\"value\":\"3\"}" +
+				"]",
+			metricValues
+		);
+	}
+
+	/**
+	 * Verifies that a malformed request for available metrics does not throw an exception.
+	 */
+	@Test
+	public void testInvalidListDoesNotFail() {
+		MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class));
+		MetricStoreTest.setupStore(fetcher.getMetricStore());
+
+		JobVertexMetricsHandler handler = new JobVertexMetricsHandler(fetcher);
+
+		Map<String, String> pathParams = new HashMap<>();
+		Map<String, String> queryParams = new HashMap<>();
+
+		pathParams.put("jobid", "jobid");
+		pathParams.put("vertexid", "taskid");
+
+		//-----invalid variable
+		pathParams.put("jobid", "nonexistent");
+
+		try {
+			assertEquals("", handler.handleRequest(pathParams, queryParams, null));
+		} catch (Exception e) {
+			fail();
+		}
+	}
+
+	/**
+	 * Verifies that a malformed request for a metric value does not throw an exception.
+	 */
+	@Test
+	public void testInvalidGetDoesNotFail() {
+		MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class));
+		MetricStoreTest.setupStore(fetcher.getMetricStore());
+
+		JobVertexMetricsHandler handler = new JobVertexMetricsHandler(fetcher);
+
+		Map<String, String> pathParams = new HashMap<>();
+		Map<String, String> queryParams = new HashMap<>();
+
+		pathParams.put("jobid", "jobid");
+		pathParams.put("vertexid", "taskid");
+
+		//-----empty string
+		queryParams.put("get", "");
+
+		try {
+			assertEquals("", handler.handleRequest(pathParams, queryParams, null));
+		} catch (Exception e) {
+			fail(e.getMessage());
+		}
+
+		//-----invalid variable
+		pathParams.put("jobid", "nonexistent");
+		queryParams.put("get", "subindex.opname.abc.metric5");
+
+		try {
+			assertEquals("", handler.handleRequest(pathParams, queryParams, null));
+		} catch (Exception e) {
+			fail(e.getMessage());
+		}
+
+		//-----invalid metric
+		pathParams.put("jobid", "nonexistant");
+		queryParams.put("get", "subindex.opname.abc.nonexistant");
+
+		try {
+			assertEquals("", handler.handleRequest(pathParams, queryParams, null));
+		} catch (Exception e) {
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
new file mode 100644
index 0000000..9757574
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.metrics;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import scala.concurrent.ExecutionContext;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.powermock.api.mockito.PowerMockito.mock;
+
+public class JobManagerMetricsHandlerTest extends TestLogger {
+	@Test
+	public void getMapFor() {
+		MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class));
+		MetricStore store = MetricStoreTest.setupStore(fetcher.getMetricStore());
+
+		JobManagerMetricsHandler handler = new JobManagerMetricsHandler(fetcher);
+
+		Map<String, String> pathParams = new HashMap<>();
+
+		Map<String, Object> metrics = handler.getMapFor(pathParams, store);
+
+		assertEquals(0L, metrics.get("abc.metric1"));
+	}
+
+	@Test
+	public void getMapForNull() {
+		MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class));
+		MetricStore store = fetcher.getMetricStore();
+
+		JobManagerMetricsHandler handler = new JobManagerMetricsHandler(fetcher);
+
+		Map<String, String> pathParams = new HashMap<>();
+
+		Map<String, Object> metrics = handler.getMapFor(pathParams, store);
+
+		assertNotNull(metrics);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
new file mode 100644
index 0000000..c0cc345
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.metrics;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import scala.concurrent.ExecutionContext;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.runtime.webmonitor.metrics.JobMetricsHandler.PARAMETER_JOB_ID;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.powermock.api.mockito.PowerMockito.mock;
+
+public class JobMetricsHandlerTest extends TestLogger {
+	@Test
+	public void getMapFor() throws Exception {
+		MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class));
+		MetricStore store = MetricStoreTest.setupStore(fetcher.getMetricStore());
+
+		JobMetricsHandler handler = new JobMetricsHandler(fetcher);
+
+		Map<String, String> pathParams = new HashMap<>();
+		pathParams.put(PARAMETER_JOB_ID, "jobid");
+
+		Map<String, Object> metrics = handler.getMapFor(pathParams, store);
+
+		assertEquals(2L, metrics.get("abc.metric3"));
+	}
+
+	@Test
+	public void getMapForNull() {
+		MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class));
+		MetricStore store = fetcher.getMetricStore();
+
+		JobMetricsHandler handler = new JobMetricsHandler(fetcher);
+
+		Map<String, String> pathParams = new HashMap<>();
+
+		Map<String, Object> metrics = handler.getMapFor(pathParams, store);
+
+		assertNull(metrics);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
new file mode 100644
index 0000000..d6e5ca7
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.metrics;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import scala.concurrent.ExecutionContext;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.runtime.webmonitor.metrics.JobMetricsHandler.PARAMETER_JOB_ID;
+import static org.apache.flink.runtime.webmonitor.metrics.JobVertexMetricsHandler.PARAMETER_VERTEX_ID;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.powermock.api.mockito.PowerMockito.mock;
+
+public class JobVertexMetricsHandlerTest extends TestLogger {
+	@Test
+	public void getMapFor() throws Exception {
+		MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class));
+		MetricStore store = MetricStoreTest.setupStore(fetcher.getMetricStore());
+
+		JobVertexMetricsHandler handler = new JobVertexMetricsHandler(fetcher);
+
+		Map<String, String> pathParams = new HashMap<>();
+		pathParams.put(PARAMETER_JOB_ID, "jobid");
+		pathParams.put(PARAMETER_VERTEX_ID, "taskid");
+
+		Map<String, Object> metrics = handler.getMapFor(pathParams, store);
+
+		assertEquals(3L, metrics.get("8.abc.metric4"));
+
+		assertEquals(4L, metrics.get("8.opname.abc.metric5"));
+	}
+
+	@Test
+	public void getMapForNull() {
+		MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class));
+		MetricStore store = fetcher.getMetricStore();
+
+		JobVertexMetricsHandler handler = new JobVertexMetricsHandler(fetcher);
+
+		Map<String, String> pathParams = new HashMap<>();
+
+		Map<String, Object> metrics = handler.getMapFor(pathParams, store);
+
+		assertNull(metrics);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
new file mode 100644
index 0000000..e0cfe26
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.metrics;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
+import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.apache.flink.runtime.metrics.dump.MetricQueryService;
+import org.apache.flink.runtime.metrics.util.TestingHistogram;
+import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import scala.Option;
+import scala.collection.JavaConverters;
+import scala.concurrent.ExecutionContext$;
+import scala.concurrent.ExecutionContextExecutor;
+import scala.concurrent.Future$;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.runtime.metrics.dump.MetricQueryService.METRIC_QUERY_SERVICE_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isA;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(MetricFetcher.class)
+public class MetricFetcherTest extends TestLogger {
+	@Test
+	public void testUpdate() throws Exception {
+		// ========= setup TaskManager =================================================================================
+		JobID jobID = new JobID();
+		InstanceID tmID = new InstanceID();
+		ActorGateway taskManagerGateway = mock(ActorGateway.class);
+		when(taskManagerGateway.path()).thenReturn("/tm/address");
+
+		Instance taskManager = mock(Instance.class);
+		when(taskManager.getActorGateway()).thenReturn(taskManagerGateway);
+		when(taskManager.getId()).thenReturn(tmID);
+
+		// ========= setup JobManager ==================================================================================
+		JobDetails details = mock(JobDetails.class);
+		when(details.getJobId()).thenReturn(jobID);
+
+		ActorGateway jobManagerGateway = mock(ActorGateway.class);
+		Object registeredTaskManagersAnswer = new JobManagerMessages.RegisteredTaskManagers(
+			JavaConverters.collectionAsScalaIterableConverter(Collections.singletonList(taskManager)).asScala());
+
+		when(jobManagerGateway.ask(isA(RequestJobDetails.class), any(FiniteDuration.class)))
+			.thenReturn(Future$.MODULE$.successful((Object) new MultipleJobsDetails(new JobDetails[0], new JobDetails[0])));
+		when(jobManagerGateway.ask(isA(JobManagerMessages.RequestRegisteredTaskManagers$.class), any(FiniteDuration.class)))
+			.thenReturn(Future$.MODULE$.successful(registeredTaskManagersAnswer));
+		when(jobManagerGateway.path()).thenReturn("/jm/address");
+
+		JobManagerRetriever retriever = mock(JobManagerRetriever.class);
+		when(retriever.getJobManagerGatewayAndWebPort())
+			.thenReturn(Option.apply(new scala.Tuple2<ActorGateway, Integer>(jobManagerGateway, 0)));
+
+		// ========= setup QueryServices ================================================================================
+		Object requestMetricsAnswer = createRequestDumpAnswer(tmID, jobID);
+
+		final ActorRef jmQueryService = mock(ActorRef.class);
+		final ActorRef tmQueryService = mock(ActorRef.class);
+
+		ActorSystem actorSystem = mock(ActorSystem.class);
+		when(actorSystem.actorFor(eq("/jm/" + METRIC_QUERY_SERVICE_NAME))).thenReturn(jmQueryService);
+		when(actorSystem.actorFor(eq("/tm/" + METRIC_QUERY_SERVICE_NAME))).thenReturn(tmQueryService);
+
+		MetricFetcher.BasicGateway jmQueryServiceGateway = mock(MetricFetcher.BasicGateway.class);
+		when(jmQueryServiceGateway.ask(any(MetricQueryService.getCreateDump().getClass()), any(FiniteDuration.class)))
+			.thenReturn(Future$.MODULE$.successful((Object) new byte[16]));
+
+		MetricFetcher.BasicGateway tmQueryServiceGateway = mock(MetricFetcher.BasicGateway.class);
+		when(tmQueryServiceGateway.ask(any(MetricQueryService.getCreateDump().getClass()), any(FiniteDuration.class)))
+			.thenReturn(Future$.MODULE$.successful(requestMetricsAnswer));
+
+		whenNew(MetricFetcher.BasicGateway.class)
+			.withArguments(eq(new Object() {
+				@Override
+				public boolean equals(Object o) {
+					return o == jmQueryService;
+				}
+			}))
+			.thenReturn(jmQueryServiceGateway);
+		whenNew(MetricFetcher.BasicGateway.class)
+			.withArguments(eq(new Object() {
+				@Override
+				public boolean equals(Object o) {
+					return o == tmQueryService;
+				}
+			}))
+			.thenReturn(tmQueryServiceGateway);
+
+		// ========= start MetricFetcher testing =======================================================================
+		ExecutionContextExecutor context = ExecutionContext$.MODULE$.fromExecutor(new CurrentThreadExecutor());
+		MetricFetcher fetcher = new MetricFetcher(actorSystem, retriever, context);
+
+		// verify that update fetches metrics and updates the store
+		fetcher.update();
+		MetricStore store = fetcher.getMetricStore();
+		synchronized (store) {
+			assertEquals(7L, store.jobManager.metrics.get("abc.hist_min"));
+			assertEquals(6L, store.jobManager.metrics.get("abc.hist_max"));
+			assertEquals(4.0, store.jobManager.metrics.get("abc.hist_mean"));
+			assertEquals(0.5, store.jobManager.metrics.get("abc.hist_median"));
+			assertEquals(5.0, store.jobManager.metrics.get("abc.hist_stddev"));
+			assertEquals(0.75, store.jobManager.metrics.get("abc.hist_p75"));
+			assertEquals(0.9, store.jobManager.metrics.get("abc.hist_p90"));
+			assertEquals(0.95, store.jobManager.metrics.get("abc.hist_p95"));
+			assertEquals(0.98, store.jobManager.metrics.get("abc.hist_p98"));
+			assertEquals(0.99, store.jobManager.metrics.get("abc.hist_p99"));
+			assertEquals(0.999, store.jobManager.metrics.get("abc.hist_p999"));
+
+			assertEquals("x", store.taskManagers.get(tmID.toString()).metrics.get("abc.gauge"));
+			assertEquals(5.0, store.jobs.get(jobID.toString()).metrics.get("abc.jc"));
+			assertEquals(2L, store.jobs.get(jobID.toString()).tasks.get("taskid").metrics.get("2.abc.tc"));
+			assertEquals(1L, store.jobs.get(jobID.toString()).tasks.get("taskid").metrics.get("2.opname.abc.oc"));
+		}
+	}
+
+	public class CurrentThreadExecutor implements Executor {
+		public void execute(Runnable r) {
+			r.run();
+		}
+	}
+
+	private static byte[] createRequestDumpAnswer(InstanceID tmID, JobID jobID) throws IOException {
+		Map<Counter, Tuple2<QueryScopeInfo, String>> counters = new HashMap<>();
+		Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges = new HashMap<>();
+		Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms = new HashMap<>();
+		Map<Meter, Tuple2<QueryScopeInfo, String>> meters = new HashMap<>();
+
+		SimpleCounter c1 = new SimpleCounter();
+		SimpleCounter c2 = new SimpleCounter();
+		
+		c1.inc(1);
+		c2.inc(2);
+
+		counters.put(c1, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.OperatorQueryScopeInfo(jobID.toString(), "taskid", 2, "opname", "abc"), "oc"));
+		counters.put(c2, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.TaskQueryScopeInfo(jobID.toString(), "taskid", 2, "abc"), "tc"));
+		meters.put(new Meter() {
+			@Override
+			public void markEvent() {
+			}
+
+			@Override
+			public void markEvent(long n) {
+			}
+
+			@Override
+			public double getRate() {
+				return 5;
+			}
+
+			@Override
+			public long getCount() {
+				return 10;
+			}
+		}, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.JobQueryScopeInfo(jobID.toString(), "abc"), "jc"));
+		gauges.put(new Gauge<String>() {
+			@Override
+			public String getValue() {
+				return "x";
+			}
+		}, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.TaskManagerQueryScopeInfo(tmID.toString(), "abc"), "gauge"));
+		histograms.put(new TestingHistogram(), new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.JobManagerQueryScopeInfo("abc"), "hist"));
+
+		MetricDumpSerialization.MetricDumpSerializer serializer = new MetricDumpSerialization.MetricDumpSerializer();
+		byte[] dump = serializer.serialize(counters, gauges, histograms, meters);
+		serializer.close();
+
+		return dump;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricStoreTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricStoreTest.java
new file mode 100644
index 0000000..9dc2929
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricStoreTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.metrics;
+
+import org.apache.flink.runtime.metrics.dump.MetricDump;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+
+public class MetricStoreTest extends TestLogger {
+	@Test
+	public void testAdd() throws IOException {
+		MetricStore store = setupStore(new MetricStore());
+
+		assertEquals(0L, store.jobManager.metrics.get("abc.metric1"));
+		assertEquals(1L, store.taskManagers.get("tmid").metrics.get("abc.metric2"));
+		assertEquals(2L, store.jobs.get("jobid").metrics.get("abc.metric3"));
+		assertEquals(3L, store.jobs.get("jobid").tasks.get("taskid").metrics.get("8.abc.metric4"));
+		assertEquals(4L, store.jobs.get("jobid").tasks.get("taskid").metrics.get("8.opname.abc.metric5"));
+	}
+
+	@Test
+	public void testMalformedNameHandling() {
+		MetricStore store = new MetricStore();
+		//-----verify that no exceptions are thrown
+		
+		// null
+		store.add(null);
+		// empty name
+		QueryScopeInfo.JobManagerQueryScopeInfo info = new QueryScopeInfo.JobManagerQueryScopeInfo("");
+		MetricDump.CounterDump cd = new MetricDump.CounterDump(info, "", 0);
+		store.add(cd);
+
+		//-----verify that no side effects occur
+		assertEquals(0, store.jobManager.metrics.size());
+		assertEquals(0, store.taskManagers.size());
+		assertEquals(0, store.jobs.size());
+	}
+
+	public static MetricStore setupStore(MetricStore store) {
+		QueryScopeInfo.JobManagerQueryScopeInfo jm = new QueryScopeInfo.JobManagerQueryScopeInfo("abc");
+		MetricDump.CounterDump cd1 = new MetricDump.CounterDump(jm, "metric1", 0);
+
+		QueryScopeInfo.TaskManagerQueryScopeInfo tm = new QueryScopeInfo.TaskManagerQueryScopeInfo("tmid", "abc");
+		MetricDump.CounterDump cd2 = new MetricDump.CounterDump(tm, "metric2", 1);
+
+		QueryScopeInfo.JobQueryScopeInfo job = new QueryScopeInfo.JobQueryScopeInfo("jobid", "abc");
+		MetricDump.CounterDump cd3 = new MetricDump.CounterDump(job, "metric3", 2);
+
+		QueryScopeInfo.TaskQueryScopeInfo task = new QueryScopeInfo.TaskQueryScopeInfo("jobid", "taskid", 8, "abc");
+		MetricDump.CounterDump cd4 = new MetricDump.CounterDump(task, "metric4", 3);
+
+		QueryScopeInfo.OperatorQueryScopeInfo operator = new QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid", 8, "opname", "abc");
+		MetricDump.CounterDump cd5 = new MetricDump.CounterDump(operator, "metric5", 4);
+
+		store.add(cd1);
+		store.add(cd2);
+		store.add(cd3);
+		store.add(cd4);
+		store.add(cd5);
+		
+		return store;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
new file mode 100644
index 0000000..6299a56
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.metrics;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import scala.concurrent.ExecutionContext;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.runtime.webmonitor.metrics.TaskManagerMetricsHandler.PARAMETER_TM_ID;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.powermock.api.mockito.PowerMockito.mock;
+
+public class TaskManagerMetricsHandlerTest extends TestLogger {
+	@Test
+	public void getMapFor() throws Exception {
+		MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class));
+		MetricStore store = MetricStoreTest.setupStore(fetcher.getMetricStore());
+
+		TaskManagerMetricsHandler handler = new TaskManagerMetricsHandler(fetcher);
+
+		Map<String, String> pathParams = new HashMap<>();
+		pathParams.put(PARAMETER_TM_ID, "tmid");
+
+		Map<String, Object> metrics = handler.getMapFor(pathParams, store);
+
+		assertEquals(1L, metrics.get("abc.metric2"));
+	}
+
+	@Test
+	public void getMapForNull() {
+		MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class));
+		MetricStore store = fetcher.getMetricStore();
+
+		TaskManagerMetricsHandler handler = new TaskManagerMetricsHandler(fetcher);
+
+		Map<String, String> pathParams = new HashMap<>();
+
+		Map<String, Object> metrics = handler.getMapFor(pathParams, store);
+
+		assertNull(metrics);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
index 763ea66..9c96858 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.metrics;
 
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DelegatingConfiguration;
@@ -26,6 +28,8 @@ import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.metrics.reporter.Scheduled;
+import org.apache.flink.runtime.metrics.dump.MetricQueryService;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
 import org.apache.flink.runtime.metrics.scope.ScopeFormat;
 import org.apache.flink.runtime.metrics.scope.ScopeFormats;
 import org.slf4j.Logger;
@@ -47,6 +51,7 @@ public class MetricRegistry {
 	
 	private List<MetricReporter> reporters;
 	private ScheduledExecutorService executor;
+	private ActorRef queryService;
 
 	private final ScopeFormats scopeFormats;
 
@@ -144,6 +149,19 @@ public class MetricRegistry {
 		}
 	}
 
+	/**
+	 * Initializes the MetricQueryService.
+	 * 
+	 * @param actorSystem ActorSystem to create the MetricQueryService on
+     */
+	public void startQueryService(ActorSystem actorSystem) {
+		try {
+			queryService = MetricQueryService.startMetricQueryService(actorSystem);
+		} catch (Exception e) {
+			LOG.warn("Could not start MetricDumpActor. No metrics will be submitted to the WebInterface.", e);
+		}
+	}
+
 	public char getDelimiter() {
 		return this.delimiter;
 	}
@@ -207,6 +225,9 @@ public class MetricRegistry {
 					}
 				}
 			}
+			if (queryService != null) {
+				MetricQueryService.notifyOfAddedMetric(queryService, metric, metricName, (AbstractMetricGroup) group);
+			}
 		} catch (Exception e) {
 			LOG.error("Error while registering metric.", e);
 		}
@@ -228,6 +249,9 @@ public class MetricRegistry {
 					}
 				}
 			}
+			if (queryService != null) {
+				MetricQueryService.notifyOfRemovedMetric(queryService, metric);
+			}
 		} catch (Exception e) {
 			LOG.error("Error while registering metric.", e);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java
new file mode 100644
index 0000000..2239b50
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics.dump;
+
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A container for a dumped metric that contains the scope, name and value(s) of the metric.
+ */
+public abstract class MetricDump {
+	/** Categories to be returned by {@link MetricDump#getCategory()} to avoid instanceof checks. */
+	public static final byte METRIC_CATEGORY_COUNTER = 0;
+	public static final byte METRIC_CATEGORY_GAUGE = 1;
+	public static final byte METRIC_CATEGORY_HISTOGRAM = 2;
+	public static final byte METRIC_CATEGORY_METER = 3;
+
+	/** The scope information for the stored metric. */
+	public final QueryScopeInfo scopeInfo;
+	/** The name of the stored metric. */
+	public final String name;
+
+	private MetricDump(QueryScopeInfo scopeInfo, String name) {
+		this.scopeInfo = Preconditions.checkNotNull(scopeInfo);
+		this.name = Preconditions.checkNotNull(name);
+	}
+
+	/**
+	 * Returns the category for this MetricDump.
+	 *
+	 * @return category
+	 */
+	public abstract byte getCategory();
+
+	/**
+	 * Container for the value of a {@link org.apache.flink.metrics.Counter}.
+	 */
+	public static class CounterDump extends MetricDump {
+		public final long count;
+
+		public CounterDump(QueryScopeInfo scopeInfo, String name, long count) {
+			super(scopeInfo, name);
+			this.count = count;
+		}
+
+		@Override
+		public byte getCategory() {
+			return METRIC_CATEGORY_COUNTER;
+		}
+	}
+
+	/**
+	 * Container for the value of a {@link org.apache.flink.metrics.Gauge} as a string.
+	 */
+	public static class GaugeDump extends MetricDump {
+		public final String value;
+
+		public GaugeDump(QueryScopeInfo scopeInfo, String name, String value) {
+			super(scopeInfo, name);
+			this.value = Preconditions.checkNotNull(value);
+		}
+
+		@Override
+		public byte getCategory() {
+			return METRIC_CATEGORY_GAUGE;
+		}
+	}
+
+	/**
+	 * Container for the values of a {@link org.apache.flink.metrics.Histogram}.
+	 */
+	public static class HistogramDump extends MetricDump {
+		public long min;
+		public long max;
+		public double mean;
+		public double median;
+		public double stddev;
+		public double p75;
+		public double p90;
+		public double p95;
+		public double p98;
+		public double p99;
+		public double p999;
+
+		public HistogramDump(QueryScopeInfo scopeInfo, String name,
+			long min, long max, double mean, double median, double stddev,
+			double p75, double p90, double p95, double p98, double p99, double p999) {
+
+			super(scopeInfo, name);
+			this.min = min;
+			this.max = max;
+			this.mean = mean;
+			this.median = median;
+			this.stddev = stddev;
+			this.p75 = p75;
+			this.p90 = p90;
+			this.p95 = p95;
+			this.p98 = p98;
+			this.p99 = p99;
+			this.p999 = p999;
+		}
+
+		@Override
+		public byte getCategory() {
+			return METRIC_CATEGORY_HISTOGRAM;
+		}
+	}
+
+	/**
+	 * Container for the rate of a {@link org.apache.flink.metrics.Meter}. 
+	 */
+	public static class MeterDump extends MetricDump {
+		public final double rate;
+
+		public MeterDump(QueryScopeInfo scopeInfo, String name, double rate) {
+			super(scopeInfo, name);
+			this.rate = rate;
+		}
+		@Override
+		public byte getCategory() {
+			return METRIC_CATEGORY_METER;
+		}
+	}
+}


Mime
View raw message