flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject [3/5] flink git commit: [FLINK-456] Basic JM Metric Infrastructure
Date Fri, 01 Jul 2016 13:09:30 GMT
[FLINK-456] Basic JM Metric Infrastructure

This closes #2146


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a3a9fd11
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a3a9fd11
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a3a9fd11

Branch: refs/heads/master
Commit: a3a9fd1147aa926987420057f8305ab498519a45
Parents: a11c1c6
Author: zentol <chesnay@apache.org>
Authored: Fri Jul 1 14:12:44 2016 +0200
Committer: zentol <chesnay@apache.org>
Committed: Fri Jul 1 15:09:16 2016 +0200

----------------------------------------------------------------------
 .../apache/flink/metrics/MetricRegistry.java    |  14 ++-
 .../groups/JobManagerJobMetricGroup.java        |  69 +++++++++++
 .../metrics/groups/JobManagerMetricGroup.java   | 104 ++++++++++++++++
 .../flink/metrics/groups/JobMetricGroup.java    |  99 ++-------------
 .../groups/TaskManagerJobMetricGroup.java       | 122 +++++++++++++++++++
 .../metrics/groups/TaskManagerMetricGroup.java  |  12 +-
 .../flink/metrics/groups/TaskMetricGroup.java   |   8 +-
 .../flink/metrics/groups/scope/ScopeFormat.java |  84 +++++++++++--
 .../metrics/groups/scope/ScopeFormats.java      |  27 +++-
 .../flink/metrics/MetricRegistryTest.java       |   4 +-
 .../flink/metrics/groups/JobGroupTest.java      |  94 --------------
 .../metrics/groups/JobManagerGroupTest.java     | 108 ++++++++++++++++
 .../metrics/groups/JobManagerJobGroupTest.java  |  90 ++++++++++++++
 .../flink/metrics/groups/OperatorGroupTest.java |   2 +-
 .../flink/metrics/groups/TaskGroupTest.java     |   6 +-
 .../metrics/groups/TaskManagerJobGroupTest.java |  94 ++++++++++++++
 .../flink/runtime/jobmanager/JobManager.scala   |  41 ++++++-
 .../testutils/UnregisteredTaskMetricsGroup.java |   4 +-
 .../runtime/testingUtils/TestingCluster.scala   |   6 +-
 .../testingUtils/TestingJobManager.scala        |   7 +-
 .../runtime/testingUtils/TestingUtils.scala     |   6 +-
 .../org/apache/flink/yarn/YarnJobManager.scala  |   7 +-
 22 files changed, 778 insertions(+), 230 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java b/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
index f283ce3..09beef6 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
@@ -52,8 +52,10 @@ public class MetricRegistry {
 	public static final String KEY_METRICS_REPORTER_ARGUMENTS = "metrics.reporter.arguments";
 	public static final String KEY_METRICS_REPORTER_INTERVAL = "metrics.reporter.interval";
 
+	public static final String KEY_METRICS_SCOPE_NAMING_JM = "metrics.scope.jm";
 	public static final String KEY_METRICS_SCOPE_NAMING_TM = "metrics.scope.tm";
-	public static final String KEY_METRICS_SCOPE_NAMING_JOB = "metrics.scope.job";
+	public static final String KEY_METRICS_SCOPE_NAMING_JM_JOB = "metrics.scope.jm.job";
+	public static final String KEY_METRICS_SCOPE_NAMING_TM_JOB = "metrics.scope.tm.job";
 	public static final String KEY_METRICS_SCOPE_NAMING_TASK = "metrics.scope.task";
 	public static final String KEY_METRICS_SCOPE_NAMING_OPERATOR = "metrics.scope.operator";
 
@@ -243,16 +245,20 @@ public class MetricRegistry {
 	}
 
 	static ScopeFormats createScopeConfig(Configuration config) {
+		String jmFormat = config.getString(
+				KEY_METRICS_SCOPE_NAMING_JM, ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_GROUP);
+		String jmJobFormat = config.getString(
+			KEY_METRICS_SCOPE_NAMING_JM_JOB, ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_JOB_GROUP);
 		String tmFormat = config.getString(
 				KEY_METRICS_SCOPE_NAMING_TM, ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_GROUP);
-		String jobFormat = config.getString(
-				KEY_METRICS_SCOPE_NAMING_JOB, ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP);
+		String tmJobFormat = config.getString(
+				KEY_METRICS_SCOPE_NAMING_TM_JOB, ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP);
 		String taskFormat = config.getString(
 				KEY_METRICS_SCOPE_NAMING_TASK, ScopeFormat.DEFAULT_SCOPE_TASK_GROUP);
 		String operatorFormat = config.getString(
 				KEY_METRICS_SCOPE_NAMING_OPERATOR, ScopeFormat.DEFAULT_SCOPE_OPERATOR_GROUP);
 		
-		return new ScopeFormats(tmFormat, jobFormat, taskFormat, operatorFormat);
+		return new ScopeFormats(jmFormat, jmJobFormat, tmFormat, tmJobFormat, taskFormat, operatorFormat);
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-core/src/main/java/org/apache/flink/metrics/groups/JobManagerJobMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/JobManagerJobMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/JobManagerJobMetricGroup.java
new file mode 100644
index 0000000..1dd0439
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/JobManagerJobMetricGroup.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics.groups;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.metrics.MetricRegistry;
+import org.apache.flink.metrics.groups.scope.ScopeFormat.JobManagerJobScopeFormat;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Special {@link org.apache.flink.metrics.MetricGroup} representing everything belonging to
+ * a specific job, running on the JobManager.
+ */
+@Internal
+public class JobManagerJobMetricGroup extends JobMetricGroup {
+
+	/** The metrics group that contains this group */
+	private final JobManagerMetricGroup parent;
+
+	public JobManagerJobMetricGroup(
+		MetricRegistry registry,
+		JobManagerMetricGroup parent,
+		JobID jobId,
+		@Nullable String jobName) {
+
+		this(registry, checkNotNull(parent), registry.getScopeFormats().getJobManagerJobFormat(), jobId, jobName);
+	}
+
+	public JobManagerJobMetricGroup(
+		MetricRegistry registry,
+		JobManagerMetricGroup parent,
+		JobManagerJobScopeFormat scopeFormat,
+		JobID jobId,
+		@Nullable String jobName) {
+
+		super(registry, jobId, jobName, scopeFormat.formatScope(parent, jobId, jobName));
+
+		this.parent = checkNotNull(parent);
+	}
+
+	public final JobManagerMetricGroup parent() {
+		return parent;
+	}
+
+	@Override
+	protected Iterable<? extends ComponentMetricGroup> subComponents() {
+		return Collections.emptyList();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-core/src/main/java/org/apache/flink/metrics/groups/JobManagerMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/JobManagerMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/JobManagerMetricGroup.java
new file mode 100644
index 0000000..67e1117
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/JobManagerMetricGroup.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics.groups;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.metrics.MetricRegistry;
+import org.apache.flink.metrics.groups.scope.ScopeFormat.JobManagerScopeFormat;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Special {@link org.apache.flink.metrics.MetricGroup} representing a JobManager.
+ *
+ * <p>Contains extra logic for adding jobs with tasks, and removing jobs when they do
+ * not contain tasks any more
+ */
+@Internal
+public class JobManagerMetricGroup extends ComponentMetricGroup {
+
+	private final Map<JobID, JobManagerJobMetricGroup> jobs = new HashMap<>();
+
+	private final String hostname;
+
+	public JobManagerMetricGroup(MetricRegistry registry, String hostname) {
+		this(registry, registry.getScopeFormats().getJobManagerFormat(), hostname);
+	}
+
+	public JobManagerMetricGroup(
+		MetricRegistry registry,
+		JobManagerScopeFormat scopeFormat,
+		String hostname) {
+
+		super(registry, scopeFormat.formatScope(hostname));
+		this.hostname = hostname;
+	}
+
+	public String hostname() {
+		return hostname;
+	}
+
+	// ------------------------------------------------------------------------
+	//  job groups
+	// ------------------------------------------------------------------------
+
+	public JobManagerJobMetricGroup addJob(
+		JobID jobId,
+		String jobName) {
+		// get or create a jobs metric group
+		JobManagerJobMetricGroup currentJobGroup;
+		synchronized (this) {
+			if (!isClosed()) {
+				currentJobGroup = jobs.get(jobId);
+
+				if (currentJobGroup == null || currentJobGroup.isClosed()) {
+					currentJobGroup = new JobManagerJobMetricGroup(registry, this, jobId, jobName);
+					jobs.put(jobId, currentJobGroup);
+				}
+				return currentJobGroup;
+			} else {
+				return null;
+			}
+		}
+	}
+
+	public void removeJob(JobID jobId) {
+		if (jobId == null) {
+			return;
+		}
+
+		synchronized (this) {
+			JobManagerJobMetricGroup containedGroup = jobs.remove(jobId);
+			if (containedGroup != null) {
+				containedGroup.close();
+			}
+		}
+	}
+
+	public int numRegisteredJobMetricGroups() {
+		return jobs.size();
+	}
+
+	@Override
+	protected Iterable<? extends ComponentMetricGroup> subComponents() {
+		return jobs.values();
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-core/src/main/java/org/apache/flink/metrics/groups/JobMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/JobMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/JobMetricGroup.java
index f816278..f7dfc78 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/groups/JobMetricGroup.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/JobMetricGroup.java
@@ -21,66 +21,36 @@ package org.apache.flink.metrics.groups;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.metrics.MetricRegistry;
-import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerJobScopeFormat;
-import org.apache.flink.util.AbstractID;
 
 import javax.annotation.Nullable;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * Special {@link org.apache.flink.metrics.MetricGroup} representing everything belonging to
- * a specific job, running on the TaskManager.
- * 
- * <p>Contains extra logic for adding Tasks ({@link TaskMetricGroup}).
+ * Special abstract {@link org.apache.flink.metrics.MetricGroup} representing everything belonging to
+ * a specific job.
  */
 @Internal
-public class JobMetricGroup extends ComponentMetricGroup {
-
-	/** The metrics group that contains this group */
-	private final TaskManagerMetricGroup parent;
-
-	/** Map from execution attempt ID (task identifier) to task metrics */
-	private final Map<AbstractID, TaskMetricGroup> tasks = new HashMap<>();
+public abstract class JobMetricGroup extends ComponentMetricGroup {
 
 	/** The ID of the job represented by this metrics group */
-	private final JobID jobId;
+	protected final JobID jobId;
 
 	/** The name of the job represented by this metrics group */
 	@Nullable
-	private final String jobName;
+	protected final String jobName;
 
 	// ------------------------------------------------------------------------
 
-	public JobMetricGroup(
+	protected JobMetricGroup(
 			MetricRegistry registry,
-			TaskManagerMetricGroup parent,
 			JobID jobId,
-			@Nullable String jobName) {
+			@Nullable String jobName,
+			String[] scope) {
+		super(registry, scope);
 		
-		this(registry, checkNotNull(parent), registry.getScopeFormats().getJobFormat(), jobId, jobName);
-	}
-
-	public JobMetricGroup(
-			MetricRegistry registry,
-			TaskManagerMetricGroup parent,
-			TaskManagerJobScopeFormat scopeFormat, 
-			JobID jobId,
-			@Nullable String jobName) {
-
-		super(registry, scopeFormat.formatScope(parent, jobId, jobName));
-
-		this.parent = checkNotNull(parent);
-		this.jobId = checkNotNull(jobId);
+		this.jobId = jobId;
 		this.jobName = jobName;
 	}
 
-	public final TaskManagerMetricGroup parent() {
-		return parent;
-	}
-
 	public JobID jobId() {
 		return jobId;
 	}
@@ -89,53 +59,4 @@ public class JobMetricGroup extends ComponentMetricGroup {
 	public String jobName() {
 		return jobName;
 	}
-
-	// ------------------------------------------------------------------------
-	//  adding / removing tasks
-	// ------------------------------------------------------------------------
-
-	public TaskMetricGroup addTask(
-			AbstractID vertexId,
-			AbstractID executionId,
-			String taskName,
-			int subtaskIndex,
-			int attemptNumber) {
-		
-		checkNotNull(executionId);
-
-		synchronized (this) {
-			if (!isClosed()) {
-				TaskMetricGroup task = new TaskMetricGroup(registry, this, 
-						vertexId, executionId, taskName, subtaskIndex, attemptNumber);
-				tasks.put(executionId, task);
-				return task;
-			} else {
-				return null;
-			}
-		}
-	}
-
-	public void removeTaskMetricGroup(AbstractID executionId) {
-		checkNotNull(executionId);
-
-		boolean removeFromParent = false;
-		synchronized (this) {
-			if (!isClosed() && tasks.remove(executionId) != null && tasks.isEmpty()) {
-				// this call removed the last task. close this group.
-				removeFromParent = true;
-				close();
-			}
-		}
-
-		// IMPORTANT: removing from the parent must happen while holding the this group's lock,
-		//      because it would violate the "first parent then subgroup" lock acquisition order
-		if (removeFromParent) {
-			parent.removeJobMetricsGroup(jobId, this);
-		}
-	}
-
-	@Override
-	protected Iterable<? extends ComponentMetricGroup> subComponents() {
-		return tasks.values();
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerJobMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerJobMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerJobMetricGroup.java
new file mode 100644
index 0000000..fdaf1de
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerJobMetricGroup.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics.groups;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.metrics.MetricRegistry;
+import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerJobScopeFormat;
+import org.apache.flink.util.AbstractID;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Special {@link org.apache.flink.metrics.MetricGroup} representing everything belonging to
+ * a specific job, running on the TaskManager.
+ *
+ * <p>Contains extra logic for adding Tasks ({@link TaskMetricGroup}).
+ */
+@Internal
+public class TaskManagerJobMetricGroup extends JobMetricGroup {
+
+	/** The metrics group that contains this group */
+	private final TaskManagerMetricGroup parent;
+
+	/** Map from execution attempt ID (task identifier) to task metrics */
+	private final Map<AbstractID, TaskMetricGroup> tasks = new HashMap<>();
+
+	// ------------------------------------------------------------------------
+
+	public TaskManagerJobMetricGroup(
+		MetricRegistry registry,
+		TaskManagerMetricGroup parent,
+		JobID jobId,
+		@Nullable String jobName) {
+
+		this(registry, checkNotNull(parent), registry.getScopeFormats().getTaskManagerJobFormat(), jobId, jobName);
+	}
+
+	public TaskManagerJobMetricGroup(
+		MetricRegistry registry,
+		TaskManagerMetricGroup parent,
+		TaskManagerJobScopeFormat scopeFormat,
+		JobID jobId,
+		@Nullable String jobName) {
+
+		super(registry, jobId, jobName, scopeFormat.formatScope(parent, jobId, jobName));
+
+		this.parent = checkNotNull(parent);
+	}
+
+	public final TaskManagerMetricGroup parent() {
+		return parent;
+	}
+
+	// ------------------------------------------------------------------------
+	//  adding / removing tasks
+	// ------------------------------------------------------------------------
+
+	public TaskMetricGroup addTask(
+		AbstractID vertexId,
+		AbstractID executionId,
+		String taskName,
+		int subtaskIndex,
+		int attemptNumber) {
+
+		checkNotNull(executionId);
+
+		synchronized (this) {
+			if (!isClosed()) {
+				TaskMetricGroup task = new TaskMetricGroup(registry, this,
+					vertexId, executionId, taskName, subtaskIndex, attemptNumber);
+				tasks.put(executionId, task);
+				return task;
+			} else {
+				return null;
+			}
+		}
+	}
+
+	public void removeTaskMetricGroup(AbstractID executionId) {
+		checkNotNull(executionId);
+
+		boolean removeFromParent = false;
+		synchronized (this) {
+			if (!isClosed() && tasks.remove(executionId) != null && tasks.isEmpty()) {
+				// this call removed the last task. close this group.
+				removeFromParent = true;
+				close();
+			}
+		}
+
+		// IMPORTANT: removing from the parent must not happen while holding the this group's lock,
+		//      because it would violate the "first parent then subgroup" lock acquisition order
+		if (removeFromParent) {
+			parent.removeJobMetricsGroup(jobId, this);
+		}
+	}
+
+	@Override
+	protected Iterable<? extends ComponentMetricGroup> subComponents() {
+		return tasks.values();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerMetricGroup.java
index 3cb3936..2b2b201 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerMetricGroup.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerMetricGroup.java
@@ -36,7 +36,7 @@ import java.util.Map;
 @Internal
 public class TaskManagerMetricGroup extends ComponentMetricGroup {
 
-	private final Map<JobID, JobMetricGroup> jobs = new HashMap<>();
+	private final Map<JobID, TaskManagerJobMetricGroup> jobs = new HashMap<>();
 
 	private final String hostname;
 
@@ -82,12 +82,12 @@ public class TaskManagerMetricGroup extends ComponentMetricGroup {
 		// because it might lead to a deadlock
 		while (true) {
 			// get or create a jobs metric group
-			JobMetricGroup currentJobGroup;
+			TaskManagerJobMetricGroup currentJobGroup;
 			synchronized (this) {
 				currentJobGroup = jobs.get(jobId);
-				
+
 				if (currentJobGroup == null || currentJobGroup.isClosed()) {
-					currentJobGroup = new JobMetricGroup(registry, this, jobId, jobName);
+					currentJobGroup = new TaskManagerJobMetricGroup(registry, this, jobId, jobName);
 					jobs.put(jobId, currentJobGroup);
 				}
 			}
@@ -106,14 +106,14 @@ public class TaskManagerMetricGroup extends ComponentMetricGroup {
 		}
 	}
 
-	public void removeJobMetricsGroup(JobID jobId, JobMetricGroup group) {
+	public void removeJobMetricsGroup(JobID jobId, TaskManagerJobMetricGroup group) {
 		if (jobId == null || group == null || !group.isClosed()) {
 			return;
 		}
 
 		synchronized (this) {
 			// optimistically remove the currently contained group, and check later if it was correct
-			JobMetricGroup containedGroup = jobs.remove(jobId);
+			TaskManagerJobMetricGroup containedGroup = jobs.remove(jobId);
 
 			// check if another group was actually contained, and restore that one
 			if (containedGroup != null && containedGroup != group) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskMetricGroup.java
index 784578b..c0428ac 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskMetricGroup.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskMetricGroup.java
@@ -38,7 +38,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 public class TaskMetricGroup extends ComponentMetricGroup {
 
 	/** The job metrics group containing this task metrics group */
-	private final JobMetricGroup parent;
+	private final TaskManagerJobMetricGroup parent;
 
 	private final Map<String, OperatorMetricGroup> operators = new HashMap<>();
 
@@ -61,7 +61,7 @@ public class TaskMetricGroup extends ComponentMetricGroup {
 
 	public TaskMetricGroup(
 			MetricRegistry registry,
-			JobMetricGroup parent,
+			TaskManagerJobMetricGroup parent,
 			@Nullable AbstractID vertexId,
 			AbstractID executionId,
 			@Nullable String taskName,
@@ -74,7 +74,7 @@ public class TaskMetricGroup extends ComponentMetricGroup {
 
 	public TaskMetricGroup(
 			MetricRegistry registry,
-			JobMetricGroup parent,
+			TaskManagerJobMetricGroup parent,
 			TaskScopeFormat scopeFormat, 
 			@Nullable AbstractID vertexId,
 			AbstractID executionId,
@@ -99,7 +99,7 @@ public class TaskMetricGroup extends ComponentMetricGroup {
 	//  properties
 	// ------------------------------------------------------------------------
 
-	public final JobMetricGroup parent() {
+	public final TaskManagerJobMetricGroup parent() {
 		return parent;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormat.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormat.java
index 9637f65..b73cf51 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormat.java
@@ -19,7 +19,9 @@
 package org.apache.flink.metrics.groups.scope;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.metrics.groups.JobMetricGroup;
+import org.apache.flink.metrics.groups.TaskManagerJobMetricGroup;
 import org.apache.flink.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.metrics.groups.TaskMetricGroup;
 import org.apache.flink.util.AbstractID;
@@ -66,29 +68,47 @@ public abstract class ScopeFormat {
 	//  Scope Variables
 	// ------------------------------------------------------------------------
 
+	public static final String SCOPE_ACTOR_HOST = asVariable("host");
+
+	// ----- Job Manager ----
+
+	/** The default scope format of the JobManager component: {@code "<host>.jobmanager"} */
+	public static final String DEFAULT_SCOPE_JOBMANAGER_COMPONENT =
+		concat(SCOPE_ACTOR_HOST, "jobmanager");
+
+	/** The default scope format of JobManager metrics: {@code "<host>.jobmanager"} */
+	public static final String DEFAULT_SCOPE_JOBMANAGER_GROUP = DEFAULT_SCOPE_JOBMANAGER_COMPONENT;
+
 	// ----- Task Manager ----
 
-	public static final String SCOPE_TASKMANAGER_HOST = asVariable("host");
 	public static final String SCOPE_TASKMANAGER_ID = asVariable("tm_id");
 
 	/** The default scope format of the TaskManager component: {@code "<host>.taskmanager.<tm_id>"} */
 	public static final String DEFAULT_SCOPE_TASKMANAGER_COMPONENT =
-			concat(SCOPE_TASKMANAGER_HOST, "taskmanager", SCOPE_TASKMANAGER_ID);
+			concat(SCOPE_ACTOR_HOST, "taskmanager", SCOPE_TASKMANAGER_ID);
 
 	/** The default scope format of TaskManager metrics: {@code "<host>.taskmanager.<tm_id>"} */
 	public static final String DEFAULT_SCOPE_TASKMANAGER_GROUP = DEFAULT_SCOPE_TASKMANAGER_COMPONENT;
 
-	// ----- Job on Task Manager ----
+	// ----- Job -----
 
 	public static final String SCOPE_JOB_ID = asVariable("job_id");
 	public static final String SCOPE_JOB_NAME = asVariable("job_name");
 
 	/** The default scope format for the job component: {@code "<job_name>"} */
-	public static final String DEFAULT_SCOPE_TASKMANAGER_JOB_COMPONENT = SCOPE_JOB_NAME;
+	public static final String DEFAULT_SCOPE_JOB_COMPONENT = SCOPE_JOB_NAME;
+
+	// ----- Job on Job Manager ----
+
+	/** The default scope format for all job metrics on a jobmanager: {@code "<host>.jobmanager.<job_name>"} */
+	public static final String DEFAULT_SCOPE_JOBMANAGER_JOB_GROUP =
+		concat(DEFAULT_SCOPE_JOBMANAGER_COMPONENT, DEFAULT_SCOPE_JOB_COMPONENT);
+
+	// ----- Job on Task Manager ----
 
-	/** The default scope format for all job metrics: {@code "<host>.taskmanager.<tm_id>.<job_name>"} */
+	/** The default scope format for all job metrics on a taskmanager: {@code "<host>.taskmanager.<tm_id>.<job_name>"} */
 	public static final String DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP =
-			concat(DEFAULT_SCOPE_TASKMANAGER_COMPONENT, DEFAULT_SCOPE_TASKMANAGER_JOB_COMPONENT);
+			concat(DEFAULT_SCOPE_TASKMANAGER_COMPONENT, DEFAULT_SCOPE_JOB_COMPONENT);
 
 	// ----- Task ----
 
@@ -125,13 +145,31 @@ public abstract class ScopeFormat {
 	// ------------------------------------------------------------------------
 
 	/**
+	 * The scope format for the {@link JobManagerMetricGroup}.
+	 */
+	public static class JobManagerScopeFormat extends ScopeFormat {
+
+		public JobManagerScopeFormat(String format) {
+			super(format, null, new String[] {
+				SCOPE_ACTOR_HOST
+			});
+		}
+
+		public String[] formatScope(String hostname) {
+			final String[] template = copyTemplate();
+			final String[] values = { hostname };
+			return bindVariables(template, values);
+		}
+	}
+
+	/**
 	 * The scope format for the {@link TaskManagerMetricGroup}.
 	 */
 	public static class TaskManagerScopeFormat extends ScopeFormat {
 
 		public TaskManagerScopeFormat(String format) {
 			super(format, null, new String[] {
-					SCOPE_TASKMANAGER_HOST,
+					SCOPE_ACTOR_HOST,
 					SCOPE_TASKMANAGER_ID
 			});
 		}
@@ -148,11 +186,35 @@ public abstract class ScopeFormat {
 	/**
 	 * The scope format for the {@link JobMetricGroup}.
 	 */
+	public static class JobManagerJobScopeFormat extends ScopeFormat {
+
+		public JobManagerJobScopeFormat(String format, JobManagerScopeFormat parentFormat) {
+			super(format, parentFormat, new String[] {
+				SCOPE_ACTOR_HOST,
+				SCOPE_JOB_ID,
+				SCOPE_JOB_NAME
+			});
+		}
+
+		public String[] formatScope(JobManagerMetricGroup parent, JobID jid, String jobName) {
+			final String[] template = copyTemplate();
+			final String[] values = {
+				parent.hostname(),
+				valueOrNull(jid),
+				valueOrNull(jobName)
+			};
+			return bindVariables(template, values);
+		}
+	}
+
+	/**
+	 * The scope format for the {@link JobMetricGroup}.
+	 */
 	public static class TaskManagerJobScopeFormat extends ScopeFormat {
 
 		public TaskManagerJobScopeFormat(String format, TaskManagerScopeFormat parentFormat) {
 			super(format, parentFormat, new String[] {
-					SCOPE_TASKMANAGER_HOST,
+					SCOPE_ACTOR_HOST,
 					SCOPE_TASKMANAGER_ID,
 					SCOPE_JOB_ID,
 					SCOPE_JOB_NAME
@@ -180,7 +242,7 @@ public abstract class ScopeFormat {
 
 		public TaskScopeFormat(String format, TaskManagerJobScopeFormat parentFormat) {
 			super(format, parentFormat, new String[] {
-					SCOPE_TASKMANAGER_HOST,
+					SCOPE_ACTOR_HOST,
 					SCOPE_TASKMANAGER_ID,
 					SCOPE_JOB_ID,
 					SCOPE_JOB_NAME,
@@ -193,7 +255,7 @@ public abstract class ScopeFormat {
 		}
 
 		public String[] formatScope(
-				JobMetricGroup parent,
+				TaskManagerJobMetricGroup parent,
 				AbstractID vertexId, AbstractID attemptId,
 				String taskName, int subtask, int attemptNumber) {
 
@@ -222,7 +284,7 @@ public abstract class ScopeFormat {
 
 		public OperatorScopeFormat(String format, TaskScopeFormat parentFormat) {
 			super(format, parentFormat, new String[] {
-					SCOPE_TASKMANAGER_HOST,
+					SCOPE_ACTOR_HOST,
 					SCOPE_TASKMANAGER_ID,
 					SCOPE_JOB_ID,
 					SCOPE_JOB_NAME,

http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormats.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormats.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormats.java
index 1451637..978e761 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormats.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormats.java
@@ -19,6 +19,8 @@
 package org.apache.flink.metrics.groups.scope;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.groups.scope.ScopeFormat.JobManagerJobScopeFormat;
+import org.apache.flink.metrics.groups.scope.ScopeFormat.JobManagerScopeFormat;
 import org.apache.flink.metrics.groups.scope.ScopeFormat.OperatorScopeFormat;
 import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerJobScopeFormat;
 import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerScopeFormat;
@@ -32,6 +34,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 @Internal
 public class ScopeFormats {
 
+	private final JobManagerScopeFormat jobManagerFormat;
+	private final JobManagerJobScopeFormat jobManagerJobFormat;
 	private final TaskManagerScopeFormat taskManagerFormat;
 	private final TaskManagerJobScopeFormat taskManagerJobFormat;
 	private final TaskScopeFormat taskFormat;
@@ -43,6 +47,11 @@ public class ScopeFormats {
 	 * Creates all default scope formats.
 	 */
 	public ScopeFormats() {
+		this.jobManagerFormat = new JobManagerScopeFormat(ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_COMPONENT);
+
+		this.jobManagerJobFormat = new JobManagerJobScopeFormat(
+			ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_JOB_GROUP, this.jobManagerFormat);
+
 		this.taskManagerFormat = new TaskManagerScopeFormat(ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_COMPONENT);
 
 		this.taskManagerJobFormat = new TaskManagerJobScopeFormat(
@@ -59,11 +68,15 @@ public class ScopeFormats {
 	 * Creates all scope formats, based on the given scope format strings.
 	 */
 	public ScopeFormats(
+			String jobManagerFormat,
+			String jobManagerJobFormat,
 			String taskManagerFormat,
 			String taskManagerJobFormat,
 			String taskFormat,
 			String operatorFormat)
 	{
+		this.jobManagerFormat = new JobManagerScopeFormat(jobManagerFormat);
+		this.jobManagerJobFormat = new JobManagerJobScopeFormat(jobManagerJobFormat, this.jobManagerFormat);
 		this.taskManagerFormat = new TaskManagerScopeFormat(taskManagerFormat);
 		this.taskManagerJobFormat = new TaskManagerJobScopeFormat(taskManagerJobFormat, this.taskManagerFormat);
 		this.taskFormat = new TaskScopeFormat(taskFormat, this.taskManagerJobFormat);
@@ -74,11 +87,15 @@ public class ScopeFormats {
 	 * Creates a {@code ScopeFormats} with the given scope formats.
 	 */
 	public ScopeFormats(
+			JobManagerScopeFormat jobManagerFormat,
+			JobManagerJobScopeFormat jobManagerJobFormat,
 			TaskManagerScopeFormat taskManagerFormat,
 			TaskManagerJobScopeFormat taskManagerJobFormat,
 			TaskScopeFormat taskFormat,
 			OperatorScopeFormat operatorFormat)
 	{
+		this.jobManagerFormat = checkNotNull(jobManagerFormat);
+		this.jobManagerJobFormat = checkNotNull(jobManagerJobFormat);
 		this.taskManagerFormat = checkNotNull(taskManagerFormat);
 		this.taskManagerJobFormat = checkNotNull(taskManagerJobFormat);
 		this.taskFormat = checkNotNull(taskFormat);
@@ -87,14 +104,22 @@ public class ScopeFormats {
 
 	// ------------------------------------------------------------------------
 
+	public JobManagerScopeFormat getJobManagerFormat() {
+		return this.jobManagerFormat;
+	}
+
 	public TaskManagerScopeFormat getTaskManagerFormat() {
 		return this.taskManagerFormat;
 	}
 
-	public TaskManagerJobScopeFormat getJobFormat() {
+	public TaskManagerJobScopeFormat getTaskManagerJobFormat() {
 		return this.taskManagerJobFormat;
 	}
 
+	public JobManagerJobScopeFormat getJobManagerJobFormat() {
+		return this.jobManagerJobFormat;
+	}
+
 	public TaskScopeFormat getTaskFormat() {
 		return this.taskFormat;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java b/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java
index 8b71816..77acd3c 100644
--- a/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java
+++ b/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java
@@ -164,14 +164,14 @@ public class MetricRegistryTest extends TestLogger {
 		Configuration config = new Configuration();
 
 		config.setString(MetricRegistry.KEY_METRICS_SCOPE_NAMING_TM, "A");
-		config.setString(MetricRegistry.KEY_METRICS_SCOPE_NAMING_JOB, "B");
+		config.setString(MetricRegistry.KEY_METRICS_SCOPE_NAMING_TM_JOB, "B");
 		config.setString(MetricRegistry.KEY_METRICS_SCOPE_NAMING_TASK, "C");
 		config.setString(MetricRegistry.KEY_METRICS_SCOPE_NAMING_OPERATOR, "D");
 
 		ScopeFormats scopeConfig = MetricRegistry.createScopeConfig(config);
 
 		assertEquals("A", scopeConfig.getTaskManagerFormat().format());
-		assertEquals("B", scopeConfig.getJobFormat().format());
+		assertEquals("B", scopeConfig.getTaskManagerJobFormat().format());
 		assertEquals("C", scopeConfig.getTaskFormat().format());
 		assertEquals("D", scopeConfig.getOperatorFormat().format());
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-core/src/test/java/org/apache/flink/metrics/groups/JobGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/JobGroupTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/JobGroupTest.java
deleted file mode 100644
index 4bcb1ee..0000000
--- a/flink-core/src/test/java/org/apache/flink/metrics/groups/JobGroupTest.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics.groups;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.MetricRegistry;
-import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerJobScopeFormat;
-import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerScopeFormat;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-
-public class JobGroupTest {
-
-	@Test
-	public void testGenerateScopeDefault() {
-		MetricRegistry registry = new MetricRegistry(new Configuration());
-
-		TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
-		JobMetricGroup jmGroup = new JobMetricGroup(registry, tmGroup, new JobID(), "myJobName");
-
-		assertArrayEquals(
-				new String[] { "theHostName", "taskmanager", "test-tm-id", "myJobName"},
-				jmGroup.getScopeComponents());
-
-		assertEquals(
-				"theHostName.taskmanager.test-tm-id.myJobName",
-				jmGroup.getScopeString());
-		registry.shutdown();
-	}
-
-	@Test
-	public void testGenerateScopeCustom() {
-		MetricRegistry registry = new MetricRegistry(new Configuration());
-
-		TaskManagerScopeFormat tmFormat = new TaskManagerScopeFormat("abc");
-		TaskManagerJobScopeFormat jmFormat = new TaskManagerJobScopeFormat("some-constant.<job_name>", tmFormat);
-
-		JobID jid = new JobID();
-
-		TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
-		JobMetricGroup jmGroup = new JobMetricGroup(registry, tmGroup, jmFormat, jid, "myJobName");
-
-		assertArrayEquals(
-				new String[] { "some-constant", "myJobName" },
-				jmGroup.getScopeComponents());
-
-		assertEquals(
-				"some-constant.myJobName",
-				jmGroup.getScopeString());
-		registry.shutdown();
-	}
-
-	@Test
-	public void testGenerateScopeCustomWildcard() {
-		MetricRegistry registry = new MetricRegistry(new Configuration());
-
-		TaskManagerScopeFormat tmFormat = new TaskManagerScopeFormat("peter.<tm_id>");
-		TaskManagerJobScopeFormat jmFormat = new TaskManagerJobScopeFormat("*.some-constant.<job_id>", tmFormat);
-
-		JobID jid = new JobID();
-
-		TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, tmFormat, "theHostName", "test-tm-id");
-		JobMetricGroup jmGroup = new JobMetricGroup(registry, tmGroup, jmFormat, jid, "myJobName");
-
-		assertArrayEquals(
-				new String[] { "peter", "test-tm-id", "some-constant", jid.toString() },
-				jmGroup.getScopeComponents());
-
-		assertEquals(
-				"peter.test-tm-id.some-constant." + jid,
-				jmGroup.getScopeString());
-		registry.shutdown();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-core/src/test/java/org/apache/flink/metrics/groups/JobManagerGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/JobManagerGroupTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/JobManagerGroupTest.java
new file mode 100644
index 0000000..8853f20
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/metrics/groups/JobManagerGroupTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics.groups;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricRegistry;
+import org.apache.flink.metrics.groups.scope.ScopeFormat.JobManagerScopeFormat;
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class JobManagerGroupTest {
+
+	// ------------------------------------------------------------------------
+	//  adding and removing jobs
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void addAndRemoveJobs() {
+		final JobManagerMetricGroup group = new JobManagerMetricGroup(
+			new MetricRegistry(new Configuration()), "localhost");
+
+		final JobID jid1 = new JobID();
+		final JobID jid2 = new JobID();
+
+		final String jobName1 = "testjob";
+		final String jobName2 = "anotherJob";
+
+		JobManagerJobMetricGroup jmJobGroup11 = group.addJob(jid1, jobName1);
+		JobManagerJobMetricGroup jmJobGroup12 = group.addJob(jid1, jobName1);
+		JobManagerJobMetricGroup jmJobGroup21 = group.addJob(jid2, jobName2);
+
+		assertEquals(jmJobGroup11, jmJobGroup12);
+
+		assertEquals(2, group.numRegisteredJobMetricGroups());
+
+		group.removeJob(jid1);
+
+		assertTrue(jmJobGroup11.isClosed());
+		assertEquals(1, group.numRegisteredJobMetricGroups());
+
+		group.removeJob(jid2);
+
+		assertTrue(jmJobGroup21.isClosed());
+		assertEquals(0, group.numRegisteredJobMetricGroups());
+	}
+
+	@Test
+	public void testCloseClosesAll() {
+		final JobManagerMetricGroup group = new JobManagerMetricGroup(
+			new MetricRegistry(new Configuration()), "localhost");
+
+		final JobID jid1 = new JobID();
+		final JobID jid2 = new JobID();
+
+		final String jobName1 = "testjob";
+		final String jobName2 = "anotherJob";
+
+		JobManagerJobMetricGroup jmJobGroup11 = group.addJob(jid1, jobName1);
+		JobManagerJobMetricGroup jmJobGroup21 = group.addJob(jid2, jobName2);
+
+		group.close();
+
+		assertTrue(jmJobGroup11.isClosed());
+		assertTrue(jmJobGroup21.isClosed());
+	}
+
+	// ------------------------------------------------------------------------
+	//  scope name tests
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testGenerateScopeDefault() {
+		MetricRegistry registry = new MetricRegistry(new Configuration());
+		JobManagerMetricGroup group = new JobManagerMetricGroup(registry, "localhost");
+
+		assertArrayEquals(new String[]{"localhost", "jobmanager"}, group.getScopeComponents());
+		assertEquals("localhost.jobmanager", group.getScopeString());
+	}
+
+	@Test
+	public void testGenerateScopeCustom() {
+		MetricRegistry registry = new MetricRegistry(new Configuration());
+		JobManagerScopeFormat format = new JobManagerScopeFormat("constant.<host>.foo.<host>");
+		JobManagerMetricGroup group = new JobManagerMetricGroup(registry, format, "host");
+
+		assertArrayEquals(new String[]{"constant", "host", "foo", "host"}, group.getScopeComponents());
+		assertEquals("constant.host.foo.host", group.getScopeString());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-core/src/test/java/org/apache/flink/metrics/groups/JobManagerJobGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/JobManagerJobGroupTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/JobManagerJobGroupTest.java
new file mode 100644
index 0000000..3833cb8
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/metrics/groups/JobManagerJobGroupTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.groups;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricRegistry;
+import org.apache.flink.metrics.groups.scope.ScopeFormat.JobManagerJobScopeFormat;
+import org.apache.flink.metrics.groups.scope.ScopeFormat.JobManagerScopeFormat;
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+public class JobManagerJobGroupTest {
+
+	@Test
+	public void testGenerateScopeDefault() {
+		MetricRegistry registry = new MetricRegistry(new Configuration());
+
+		JobManagerMetricGroup tmGroup = new JobManagerMetricGroup(registry, "theHostName");
+		JobMetricGroup jmGroup = new JobManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName");
+
+		assertArrayEquals(
+				new String[] { "theHostName", "jobmanager", "myJobName"},
+				jmGroup.getScopeComponents());
+
+		assertEquals(
+				"theHostName.jobmanager.myJobName",
+				jmGroup.getScopeString());
+	}
+
+	@Test
+	public void testGenerateScopeCustom() {
+		MetricRegistry registry = new MetricRegistry(new Configuration());
+
+		JobManagerScopeFormat tmFormat = new JobManagerScopeFormat("abc");
+		JobManagerJobScopeFormat jmFormat = new JobManagerJobScopeFormat("some-constant.<job_name>", tmFormat);
+
+		JobID jid = new JobID();
+
+		JobManagerMetricGroup tmGroup = new JobManagerMetricGroup(registry, "theHostName");
+		JobMetricGroup jmGroup = new JobManagerJobMetricGroup(registry, tmGroup, jmFormat, jid, "myJobName");
+
+		assertArrayEquals(
+				new String[] { "some-constant", "myJobName" },
+				jmGroup.getScopeComponents());
+
+		assertEquals(
+				"some-constant.myJobName",
+				jmGroup.getScopeString());
+	}
+
+	@Test
+	public void testGenerateScopeCustomWildcard() {
+		MetricRegistry registry = new MetricRegistry(new Configuration());
+
+		JobManagerScopeFormat tmFormat = new JobManagerScopeFormat("peter");
+		JobManagerJobScopeFormat jmFormat = new JobManagerJobScopeFormat("*.some-constant.<job_id>", tmFormat);
+
+		JobID jid = new JobID();
+
+		JobManagerMetricGroup tmGroup = new JobManagerMetricGroup(registry, tmFormat, "theHostName");
+		JobMetricGroup jmGroup = new JobManagerJobMetricGroup(registry, tmGroup, jmFormat, jid, "myJobName");
+
+		assertArrayEquals(
+				new String[] { "peter", "some-constant", jid.toString() },
+				jmGroup.getScopeComponents());
+
+		assertEquals(
+				"peter.some-constant." + jid,
+				jmGroup.getScopeString());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java
index c0c8842..9641632 100644
--- a/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java
+++ b/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java
@@ -35,7 +35,7 @@ public class OperatorGroupTest {
 		MetricRegistry registry = new MetricRegistry(new Configuration());
 
 		TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
-		JobMetricGroup jmGroup = new JobMetricGroup(registry, tmGroup, new JobID(), "myJobName");
+		TaskManagerJobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName");
 		TaskMetricGroup taskGroup = new TaskMetricGroup(
 				registry, jmGroup,  new AbstractID(),  new AbstractID(), "aTaskName", 11, 0);
 		OperatorMetricGroup opGroup = new OperatorMetricGroup(registry, taskGroup, "myOpName");

http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskGroupTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskGroupTest.java
index 88f425b..357852a 100644
--- a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskGroupTest.java
+++ b/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskGroupTest.java
@@ -60,7 +60,7 @@ public class TaskGroupTest {
 		AbstractID executionId = new AbstractID();
 
 		TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
-		JobMetricGroup jmGroup = new JobMetricGroup(registry, tmGroup, new JobID(), "myJobName");
+		TaskManagerJobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName");
 		TaskMetricGroup taskGroup = new TaskMetricGroup(registry, jmGroup, vertexId, executionId, "aTaskName", 13, 2);
 
 		assertArrayEquals(
@@ -86,7 +86,7 @@ public class TaskGroupTest {
 		AbstractID executionId = new AbstractID();
 
 		TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
-		JobMetricGroup jmGroup = new JobMetricGroup(registry, tmGroup, jid, "myJobName");
+		TaskManagerJobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, jid, "myJobName");
 		TaskMetricGroup taskGroup = new TaskMetricGroup(
 				registry, jmGroup, taskFormat, vertexId, executionId, "aTaskName", 13, 2);
 
@@ -114,7 +114,7 @@ public class TaskGroupTest {
 		AbstractID executionId = new AbstractID();
 
 		TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
-		JobMetricGroup jmGroup = new JobMetricGroup(registry, tmGroup, new JobID(), "myJobName");
+		TaskManagerJobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName");
 
 		TaskMetricGroup taskGroup = new TaskMetricGroup(
 				registry, jmGroup, format, new AbstractID(), executionId, "aTaskName", 13, 1);

http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerJobGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerJobGroupTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerJobGroupTest.java
new file mode 100644
index 0000000..5cec70b
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerJobGroupTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.groups;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricRegistry;
+import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerJobScopeFormat;
+import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerScopeFormat;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+public class TaskManagerJobGroupTest {
+
+	@Test
+	public void testGenerateScopeDefault() {
+		MetricRegistry registry = new MetricRegistry(new Configuration());
+
+		TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
+		JobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName");
+
+		assertArrayEquals(
+				new String[] { "theHostName", "taskmanager", "test-tm-id", "myJobName"},
+				jmGroup.getScopeComponents());
+
+		assertEquals(
+				"theHostName.taskmanager.test-tm-id.myJobName",
+				jmGroup.getScopeString());
+		registry.shutdown();
+	}
+
+	@Test
+	public void testGenerateScopeCustom() {
+		MetricRegistry registry = new MetricRegistry(new Configuration());
+
+		TaskManagerScopeFormat tmFormat = new TaskManagerScopeFormat("abc");
+		TaskManagerJobScopeFormat jmFormat = new TaskManagerJobScopeFormat("some-constant.<job_name>", tmFormat);
+
+		JobID jid = new JobID();
+
+		TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
+		JobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, jmFormat, jid, "myJobName");
+
+		assertArrayEquals(
+				new String[] { "some-constant", "myJobName" },
+				jmGroup.getScopeComponents());
+
+		assertEquals(
+				"some-constant.myJobName",
+				jmGroup.getScopeString());
+		registry.shutdown();
+	}
+
+	@Test
+	public void testGenerateScopeCustomWildcard() {
+		MetricRegistry registry = new MetricRegistry(new Configuration());
+
+		TaskManagerScopeFormat tmFormat = new TaskManagerScopeFormat("peter.<tm_id>");
+		TaskManagerJobScopeFormat jmFormat = new TaskManagerJobScopeFormat("*.some-constant.<job_id>", tmFormat);
+
+		JobID jid = new JobID();
+
+		TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, tmFormat, "theHostName", "test-tm-id");
+		JobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, jmFormat, jid, "myJobName");
+
+		assertArrayEquals(
+				new String[] { "peter", "test-tm-id", "some-constant", jid.toString() },
+				jmGroup.getScopeComponents());
+
+		assertEquals(
+				"peter.test-tm-id.some-constant." + jid,
+				jmGroup.getScopeString());
+		registry.shutdown();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 60f0a97..be1caa5 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -33,6 +33,8 @@ import org.apache.flink.api.common.{ExecutionConfig, JobID}
 import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration}
 import org.apache.flink.core.fs.FileSystem
 import org.apache.flink.core.io.InputSplitAssigner
+import org.apache.flink.metrics.{MetricRegistry => FlinkMetricRegistry}
+import org.apache.flink.metrics.groups.JobManagerMetricGroup
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
 import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
 import org.apache.flink.runtime.blob.BlobServer
@@ -124,7 +126,8 @@ class JobManager(
     protected val submittedJobGraphs : SubmittedJobGraphStore,
     protected val checkpointRecoveryFactory : CheckpointRecoveryFactory,
     protected val savepointStore: SavepointStore,
-    protected val jobRecoveryTimeout: FiniteDuration)
+    protected val jobRecoveryTimeout: FiniteDuration,
+    protected val metricsRegistry: Option[FlinkMetricRegistry])
   extends FlinkActor
   with LeaderSessionMessageFilter // mixin oder is important, we want filtering after logging
   with LogMessages // mixin order is important, we want first logging
@@ -149,6 +152,16 @@ class JobManager(
 
   var leaderSessionID: Option[UUID] = None
 
+  protected val jobManagerMetricGroup : Option[JobManagerMetricGroup] = metricsRegistry match {
+    case Some(registry) =>
+      val host = flinkConfiguration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null)
+      Option(new JobManagerMetricGroup(
+        registry, NetUtils.ipAddressToUrlString(InetAddress.getByName(host))))
+    case None =>
+      log.warn("Could not instantiate JobManager metrics.")
+      None
+  }
+
   /** Futures which have to be completed before terminating the job manager */
   var futuresToComplete: Option[Seq[Future[Unit]]] = None
 
@@ -269,6 +282,13 @@ class JobManager(
     // shut down the extra thread pool for futures
     executorService.shutdown()
 
+    // failsafe shutdown of the metrics registry
+    try {
+      metricsRegistry.map(_.shutdown())
+    } catch {
+      case t: Exception => log.error("MetricRegistry did not shutdown properly.", t)
+    }
+
     log.debug(s"Job manager ${self.path} is completely stopped.")
   }
 
@@ -2266,7 +2286,8 @@ object JobManager {
     SubmittedJobGraphStore,
     CheckpointRecoveryFactory,
     SavepointStore,
-    FiniteDuration // timeout for job recovery
+    FiniteDuration, // timeout for job recovery
+    Option[FlinkMetricRegistry]
    ) = {
 
     val timeout: FiniteDuration = AkkaUtils.getTimeout(configuration)
@@ -2358,6 +2379,13 @@ object JobManager {
       }
     }
 
+    val metricRegistry = try {
+      Option(new FlinkMetricRegistry(configuration))
+    } catch {
+      case _: Exception =>
+        None
+    }
+
     (executorService,
       instanceManager,
       scheduler,
@@ -2369,7 +2397,8 @@ object JobManager {
       submittedJobGraphs,
       checkpointRecoveryFactory,
       savepointStore,
-      jobRecoveryTimeout)
+      jobRecoveryTimeout,
+      metricRegistry)
   }
 
   /**
@@ -2432,7 +2461,8 @@ object JobManager {
     submittedJobGraphs,
     checkpointRecoveryFactory,
     savepointStore,
-    jobRecoveryTimeout) = createJobManagerComponents(
+    jobRecoveryTimeout, 
+    metricsRegistry) = createJobManagerComponents(
       configuration,
       None)
 
@@ -2458,7 +2488,8 @@ object JobManager {
       submittedJobGraphs,
       checkpointRecoveryFactory,
       savepointStore,
-      jobRecoveryTimeout)
+      jobRecoveryTimeout,
+      metricsRegistry)
 
     val jobManager: ActorRef = jobManagerActorName match {
       case Some(actorName) => actorSystem.actorOf(jobManagerProps, actorName)

http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
index 6d3f768..3d8f298 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
@@ -24,7 +24,7 @@ import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.MetricRegistry;
 import org.apache.flink.metrics.groups.IOMetricGroup;
-import org.apache.flink.metrics.groups.JobMetricGroup;
+import org.apache.flink.metrics.groups.TaskManagerJobMetricGroup;
 import org.apache.flink.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.metrics.groups.TaskMetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
@@ -60,7 +60,7 @@ public class UnregisteredTaskMetricsGroup extends TaskMetricGroup {
 		}
 	}
 
-	private static class DummyJobMetricGroup extends JobMetricGroup {
+	private static class DummyJobMetricGroup extends TaskManagerJobMetricGroup {
 		
 		public DummyJobMetricGroup() {
 			super(EMPTY_REGISTRY, new DummyTaskManagerMetricsGroup(), new JobID(), "testjob");

http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index 763bd36..b4ba40b 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -107,7 +107,8 @@ class TestingCluster(
     submittedJobsGraphs,
     checkpointRecoveryFactory,
     savepointStore,
-    jobRecoveryTimeout) = JobManager.createJobManagerComponents(
+    jobRecoveryTimeout,
+    metricRegistry) = JobManager.createJobManagerComponents(
       config,
       createLeaderElectionService())
 
@@ -128,7 +129,8 @@ class TestingCluster(
         submittedJobsGraphs,
         checkpointRecoveryFactory,
         savepointStore,
-        jobRecoveryTimeout))
+        jobRecoveryTimeout,
+        metricRegistry))
 
     val dispatcherJobManagerProps = if (synchronousDispatcher) {
       // disable asynchronous futures (e.g. accumulator update in Heartbeat)

http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
index e854b13..04689c6 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.testingUtils
 import akka.actor.ActorRef
 
 import org.apache.flink.configuration.Configuration
+import org.apache.flink.metrics.MetricRegistry
 import org.apache.flink.runtime.checkpoint.{SavepointStore, CheckpointRecoveryFactory}
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
@@ -50,7 +51,8 @@ class TestingJobManager(
     submittedJobGraphs : SubmittedJobGraphStore,
     checkpointRecoveryFactory : CheckpointRecoveryFactory,
     savepointStore : SavepointStore,
-    jobRecoveryTimeout: FiniteDuration)
+    jobRecoveryTimeout : FiniteDuration,
+    metricRegistry : Option[MetricRegistry])
   extends JobManager(
     flinkConfiguration,
       executorService,
@@ -64,5 +66,6 @@ class TestingJobManager(
     submittedJobGraphs,
     checkpointRecoveryFactory,
     savepointStore,
-    jobRecoveryTimeout)
+    jobRecoveryTimeout,
+    metricRegistry)
   with TestingJobManagerLike {}

http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index b67e319..2f43d38 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -358,7 +358,8 @@ object TestingUtils {
     submittedJobGraphs,
     checkpointRecoveryFactory,
     savepointStore,
-    jobRecoveryTimeout) = JobManager.createJobManagerComponents(
+    jobRecoveryTimeout,
+    metricsRegistry) = JobManager.createJobManagerComponents(
       configuration,
       None
     )
@@ -380,7 +381,8 @@ object TestingUtils {
       leaderElectionService,
       submittedJobGraphs,
       checkpointRecoveryFactory,
-      jobRecoveryTimeout)
+      jobRecoveryTimeout,
+      metricsRegistry)
 
     val jobManager: ActorRef = actorSystem.actorOf(jobManagerProps, JobManager.JOB_MANAGER_NAME)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index 23b3adc..3df1adc 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -24,6 +24,7 @@ import akka.actor.ActorRef
 
 import org.apache.flink.api.common.JobID
 import org.apache.flink.configuration.{Configuration => FlinkConfiguration, ConfigConstants}
+import org.apache.flink.metrics.MetricRegistry
 import org.apache.flink.runtime.checkpoint.{SavepointStore, CheckpointRecoveryFactory}
 import org.apache.flink.runtime.clusterframework.ApplicationStatus
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
@@ -70,7 +71,8 @@ class YarnJobManager(
     submittedJobGraphs : SubmittedJobGraphStore,
     checkpointRecoveryFactory : CheckpointRecoveryFactory,
     savepointStore: SavepointStore,
-    jobRecoveryTimeout: FiniteDuration)
+    jobRecoveryTimeout: FiniteDuration,
+    metricsRegistry: Option[MetricRegistry])
   extends JobManager(
     flinkConfiguration,
     executorService,
@@ -84,7 +86,8 @@ class YarnJobManager(
     submittedJobGraphs,
     checkpointRecoveryFactory,
     savepointStore,
-    jobRecoveryTimeout) {
+    jobRecoveryTimeout,
+    metricsRegistry) {
 
   val DEFAULT_YARN_HEARTBEAT_DELAY: FiniteDuration = 5 seconds
   val YARN_HEARTBEAT_DELAY: FiniteDuration =


Mime
View raw message