Repository: flink
Updated Branches:
refs/heads/master 3137bf774 -> 0d2903541
[FLINK-4775] [metrics] Simplify MetricStore access
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0d290354
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0d290354
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0d290354
Branch: refs/heads/master
Commit: 0d290354179a5ea3a11040a2ed7e218263bc474b
Parents: e30e7a6
Author: zentol <chesnay@apache.org>
Authored: Fri Oct 7 10:16:49 2016 +0200
Committer: zentol <chesnay@apache.org>
Committed: Fri Oct 21 13:42:18 2016 +0200
----------------------------------------------------------------------
.../metrics/JobManagerMetricsHandler.java | 2 +-
.../webmonitor/metrics/JobMetricsHandler.java | 10 +-
.../metrics/JobVertexMetricsHandler.java | 17 +--
.../runtime/webmonitor/metrics/MetricStore.java | 125 +++++++++++++++++--
.../metrics/TaskManagerMetricsHandler.java | 2 +-
.../webmonitor/metrics/MetricFetcherTest.java | 8 +-
.../webmonitor/metrics/MetricStoreTest.java | 10 +-
7 files changed, 134 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0d290354/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
index 54d6aea..7452c71 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
@@ -37,7 +37,7 @@ public class JobManagerMetricsHandler extends AbstractMetricsHandler {
@Override
protected Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore
metrics) {
- MetricStore.JobManagerMetricStore jobManager = metrics.jobManager;
+ MetricStore.JobManagerMetricStore jobManager = metrics.getJobManagerMetricStore();
if (jobManager == null) {
return null;
} else {
http://git-wip-us.apache.org/repos/asf/flink/blob/0d290354/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
index cdaae2c..d66c954 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
@@ -39,11 +39,9 @@ public class JobMetricsHandler extends AbstractMetricsHandler {
@Override
protected Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore
metrics) {
- MetricStore.JobMetricStore job = metrics.jobs.get(pathParams.get(PARAMETER_JOB_ID));
- if (job == null) {
- return null;
- } else {
- return job.metrics;
- }
+ MetricStore.JobMetricStore job = metrics.getJobMetricStore(pathParams.get(PARAMETER_JOB_ID));
+ return job != null
+ ? job.metrics
+ : null;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0d290354/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
index 1b92b47..6fca771 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
@@ -39,16 +39,11 @@ public class JobVertexMetricsHandler extends AbstractMetricsHandler {
@Override
protected Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore
metrics) {
- MetricStore.JobMetricStore job = metrics.jobs.get(pathParams.get(JobMetricsHandler.PARAMETER_JOB_ID));
- if (job == null) {
- return null;
- } else {
- MetricStore.TaskMetricStore task = job.tasks.get(pathParams.get(PARAMETER_VERTEX_ID));
- if (task == null) {
- return null;
- } else {
- return task.metrics;
- }
- }
+ MetricStore.TaskMetricStore task = metrics.getTaskMetricStore(
+ pathParams.get(JobMetricsHandler.PARAMETER_JOB_ID),
+ pathParams.get(PARAMETER_VERTEX_ID));
+ return task != null
+ ? task.metrics
+ : null;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0d290354/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
index c1b2bec..989145b 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
@@ -47,17 +47,21 @@ public class MetricStore {
final Map<String, TaskManagerMetricStore> taskManagers = new HashMap<>();
final Map<String, JobMetricStore> jobs = new HashMap<>();
+ // -----------------------------------------------------------------------------------------------------------------
+ // Adding metrics
+ // -----------------------------------------------------------------------------------------------------------------
public void add(MetricDump metric) {
try {
QueryScopeInfo info = metric.scopeInfo;
TaskManagerMetricStore tm;
JobMetricStore job;
TaskMetricStore task;
+ SubtaskMetricStore subtask;
String name = info.scope.isEmpty()
? metric.name
: info.scope + "." + metric.name;
-
+
if (name.isEmpty()) { // malformed transmission
return;
}
@@ -96,10 +100,18 @@ public class MetricStore {
task = new TaskMetricStore();
job.tasks.put(taskInfo.vertexID, task);
}
+ subtask = task.subtasks.get(taskInfo.subtaskIndex);
+ if (subtask == null) {
+ subtask = new SubtaskMetricStore();
+ task.subtasks.put(taskInfo.subtaskIndex, subtask);
+ }
/**
- * As the WebInterface task metric queries currently do not account for subtasks we
don't
- * divide by subtask and instead use the concatenation of subtask index and metric name
as the name.
+ * The duplication is intended. Metrics scoped by subtask are useful for several job/task
handlers,
+ * while the WebInterface task metric queries currently do not account for subtasks,
so we don't
+ * divide by subtask and instead use the concatenation of subtask index and metric name
as the name
+ * for thos.
*/
+ addMetric(subtask.metrics, name, metric);
addMetric(task.metrics, taskInfo.subtaskIndex + "." + name, metric);
break;
case INFO_CATEGORY_OPERATOR:
@@ -160,32 +172,121 @@ public class MetricStore {
}
}
+ // -----------------------------------------------------------------------------------------------------------------
+ // Accessors for sub MetricStores
+ // -----------------------------------------------------------------------------------------------------------------
+
/**
- * Sub-structure containing metrics of the JobManager.
+ * Returns the {@link JobManagerMetricStore}.
+ *
+ * @return JobManagerMetricStore
+ */
+ public JobManagerMetricStore getJobManagerMetricStore() {
+ return jobManager;
+ }
+
+ /**
+ * Returns the {@link TaskManagerMetricStore} for the given taskmanager ID.
+ *
+ * @param tmID taskmanager ID
+ * @return TaskManagerMetricStore for the given ID, or null if no store for the given argument
exists
+ */
+ public TaskManagerMetricStore getTaskManagerMetricStore(String tmID) {
+ return taskManagers.get(tmID);
+ }
+
+ /**
+ * Returns the {@link JobMetricStore} for the given job ID.
+ *
+ * @param jobID job ID
+ * @return JobMetricStore for the given ID, or null if no store for the given argument exists
+ */
+ public JobMetricStore getJobMetricStore(String jobID) {
+ return jobs.get(jobID);
+ }
+
+ /**
+ * Returns the {@link TaskMetricStore} for the given job/task ID.
+ *
+ * @param jobID job ID
+ * @param taskID task ID
+ * @return TaskMetricStore for given IDs, or null if no store for the given arguments exists
+ */
+ public TaskMetricStore getTaskMetricStore(String jobID, String taskID) {
+ JobMetricStore job = getJobMetricStore(jobID);
+ if (job == null) {
+ return null;
+ }
+ return job.getTaskMetricStore(taskID);
+ }
+
+ /**
+ * Returns the {@link SubtaskMetricStore} for the given job/task ID and subtask index.
+ *
+ * @param jobID job ID
+ * @param taskID task ID
+ * @param subtaskIndex subtask index
+ * @return SubtaskMetricStore for the given IDs and index, or null if no store for the given
arguments exists
*/
- static class JobManagerMetricStore {
+ public SubtaskMetricStore getSubtaskMetricStore(String jobID, String taskID, int subtaskIndex)
{
+ TaskMetricStore task = getTaskMetricStore(jobID, taskID);
+ if (task == null) {
+ return null;
+ }
+ return task.getSubtaskMetricStore(subtaskIndex);
+ }
+
+ // -----------------------------------------------------------------------------------------------------------------
+ // sub MetricStore classes
+ // -----------------------------------------------------------------------------------------------------------------
+ private static abstract class ComponentMetricStore {
public final Map<String, String> metrics = new HashMap<>();
+
+ public String getMetric(String name, String defaultValue) {
+ String value = this.metrics.get(name);
+ return value != null
+ ? value
+ : defaultValue;
+ }
+ }
+
+ /**
+ * Sub-structure containing metrics of the JobManager.
+ */
+ public static class JobManagerMetricStore extends ComponentMetricStore {
}
/**
* Sub-structure containing metrics of a single TaskManager.
*/
- static class TaskManagerMetricStore {
- public final Map<String, String> metrics = new HashMap<>();
+ public static class TaskManagerMetricStore extends ComponentMetricStore {
}
/**
* Sub-structure containing metrics of a single Job.
*/
- static class JobMetricStore {
- public final Map<String, String> metrics = new HashMap<>();
- public final Map<String, TaskMetricStore> tasks = new HashMap<>();
+ public static class JobMetricStore extends ComponentMetricStore {
+ private final Map<String, TaskMetricStore> tasks = new HashMap<>();
+
+ public TaskMetricStore getTaskMetricStore(String taskID) {
+ return tasks.get(taskID);
+ }
}
/**
* Sub-structure containing metrics of a single Task.
*/
- static class TaskMetricStore {
- public final Map<String, String> metrics = new HashMap<>();
+ public static class TaskMetricStore extends ComponentMetricStore {
+ private final Map<Integer, SubtaskMetricStore> subtasks = new HashMap<>();
+
+ public SubtaskMetricStore getSubtaskMetricStore(int subtaskIndex) {
+ return subtasks.get(subtaskIndex);
+ }
+ }
+
+ /**
+ * Sub-structure containing metrics of a single Subtask.
+ */
+ public static class SubtaskMetricStore extends ComponentMetricStore {
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0d290354/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java
index e4e8b00..a69b676 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java
@@ -39,7 +39,7 @@ public class TaskManagerMetricsHandler extends AbstractMetricsHandler {
@Override
protected Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore
metrics) {
- MetricStore.TaskManagerMetricStore taskManager = metrics.taskManagers.get(pathParams.get(PARAMETER_TM_ID));
+ MetricStore.TaskManagerMetricStore taskManager = metrics.getTaskManagerMetricStore(pathParams.get(PARAMETER_TM_ID));
if (taskManager == null) {
return null;
} else {
http://git-wip-us.apache.org/repos/asf/flink/blob/0d290354/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
index 14cbeac..3061346 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
@@ -156,10 +156,10 @@ public class MetricFetcherTest extends TestLogger {
assertEquals("0.99", store.jobManager.metrics.get("abc.hist_p99"));
assertEquals("0.999", store.jobManager.metrics.get("abc.hist_p999"));
- assertEquals("x", store.taskManagers.get(tmID.toString()).metrics.get("abc.gauge"));
- assertEquals("5.0", store.jobs.get(jobID.toString()).metrics.get("abc.jc"));
- assertEquals("2", store.jobs.get(jobID.toString()).tasks.get("taskid").metrics.get("2.abc.tc"));
- assertEquals("1", store.jobs.get(jobID.toString()).tasks.get("taskid").metrics.get("2.opname.abc.oc"));
+ assertEquals("x", store.getTaskManagerMetricStore(tmID.toString()).metrics.get("abc.gauge"));
+ assertEquals("5.0", store.getJobMetricStore(jobID.toString()).metrics.get("abc.jc"));
+ assertEquals("2", store.getTaskMetricStore(jobID.toString(), "taskid").metrics.get("2.abc.tc"));
+ assertEquals("1", store.getTaskMetricStore(jobID.toString(), "taskid").metrics.get("2.opname.abc.oc"));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0d290354/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricStoreTest.java
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricStoreTest.java
index ee46494..c71f015 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricStoreTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricStoreTest.java
@@ -31,11 +31,11 @@ public class MetricStoreTest extends TestLogger {
public void testAdd() throws IOException {
MetricStore store = setupStore(new MetricStore());
- assertEquals("0", store.jobManager.metrics.get("abc.metric1"));
- assertEquals("1", store.taskManagers.get("tmid").metrics.get("abc.metric2"));
- assertEquals("2", store.jobs.get("jobid").metrics.get("abc.metric3"));
- assertEquals("3", store.jobs.get("jobid").tasks.get("taskid").metrics.get("8.abc.metric4"));
- assertEquals("4", store.jobs.get("jobid").tasks.get("taskid").metrics.get("8.opname.abc.metric5"));
+ assertEquals("0", store.getJobManagerMetricStore().getMetric("abc.metric1", "-1"));
+ assertEquals("1", store.getTaskManagerMetricStore("tmid").getMetric("abc.metric2", "-1"));
+ assertEquals("2", store.getJobMetricStore("jobid").getMetric("abc.metric3", "-1"));
+ assertEquals("3", store.getTaskMetricStore("jobid", "taskid").getMetric("8.abc.metric4",
"-1"));
+ assertEquals("4", store.getTaskMetricStore("jobid", "taskid").getMetric("8.opname.abc.metric5",
"-1"));
}
@Test
|