flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [5/7] flink git commit: [FLINK-1502] [core] Cleanups, robustness, and performance improvements in the metrics system
Date Mon, 30 May 2016 20:43:36 GMT
[FLINK-1502] [core] Cleanups, robustness, and performance improvements in the metrics system


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7ad8375a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7ad8375a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7ad8375a

Branch: refs/heads/master
Commit: 7ad8375a89374bec80571029e9166f1336bdea8e
Parents: d3e3bd5
Author: Stephan Ewen <sewen@apache.org>
Authored: Wed May 25 20:34:44 2016 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Mon May 30 14:54:10 2016 +0200

----------------------------------------------------------------------
 flink-contrib/flink-storm/pom.xml               |  22 +-
 .../flink/storm/wrappers/BoltWrapperTest.java   |  13 +-
 .../common/operators/CollectionExecutor.java    |  32 +-
 .../java/org/apache/flink/metrics/Counter.java  |   5 +-
 .../java/org/apache/flink/metrics/Gauge.java    |   3 +-
 .../java/org/apache/flink/metrics/Metric.java   |   3 +-
 .../apache/flink/metrics/MetricRegistry.java    |  92 ++---
 .../metrics/groups/AbstractMetricGroup.java     |  98 +++--
 .../metrics/groups/ComponentMetricGroup.java    | 109 +----
 .../metrics/groups/GenericMetricGroup.java      |  45 +--
 .../flink/metrics/groups/IOMetricGroup.java     |  27 +-
 .../flink/metrics/groups/JobMetricGroup.java    |  88 ++--
 .../groups/NonRegisteringMetricsGroup.java      |  87 ----
 .../metrics/groups/OperatorMetricGroup.java     |  35 +-
 .../org/apache/flink/metrics/groups/Scope.java  | 119 ------
 .../metrics/groups/TaskManagerMetricGroup.java  |  68 ++--
 .../flink/metrics/groups/TaskMetricGroup.java   | 126 ++++--
 .../groups/UnregisteredMetricsGroup.java        |  73 ++++
 .../flink/metrics/groups/scope/ScopeFormat.java | 399 +++++++++++++++++++
 .../metrics/groups/scope/ScopeFormats.java      | 105 +++++
 .../metrics/reporter/AbstractReporter.java      |  40 +-
 .../flink/metrics/reporter/JMXReporter.java     | 194 ++++++---
 .../flink/metrics/reporter/MetricReporter.java  |  53 +--
 .../flink/metrics/reporter/Scheduled.java       |   8 +-
 .../functions/util/RuntimeUDFContextTest.java   |  33 +-
 .../api/common/io/RichInputFormatTest.java      |  18 +-
 .../api/common/io/RichOutputFormatTest.java     |  19 +-
 .../operators/GenericDataSinkBaseTest.java      |  16 +-
 .../operators/GenericDataSourceBaseTest.java    |  18 +-
 .../base/FlatMapOperatorCollectionTest.java     |   9 +-
 .../base/InnerJoinOperatorBaseTest.java         |  15 +-
 .../common/operators/base/MapOperatorTest.java  |  36 +-
 .../base/PartitionMapOperatorTest.java          |  19 +-
 .../flink/metrics/MetricRegistryTest.java       |  78 +---
 .../flink/metrics/groups/JobGroupTest.java      |  73 ++--
 .../groups/MetricGroupRegistrationTest.java     |   7 +-
 .../flink/metrics/groups/MetricGroupTest.java   |  32 +-
 .../flink/metrics/groups/OperatorGroupTest.java |  66 +--
 .../flink/metrics/groups/TaskGroupTest.java     | 103 +++--
 .../metrics/groups/TaskManagerGroupTest.java    |  58 +--
 .../flink/metrics/reporter/JMXReporterTest.java |  35 +-
 .../flink/metrics/util/DummyJobMetricGroup.java |  47 ---
 .../flink/metrics/util/DummyMetricGroup.java    |  57 ---
 .../flink/metrics/util/DummyMetricRegistry.java |  29 --
 .../metrics/util/DummyOperatorMetricGroup.java  |  37 --
 .../flink/metrics/util/DummyReporter.java       |  47 ---
 .../util/DummyTaskManagerMetricGroup.java       |  42 --
 .../metrics/util/DummyTaskMetricGroup.java      |  42 --
 .../apache/flink/metrics/util/TestReporter.java |  11 +-
 .../base/CoGroupOperatorCollectionTest.java     |   6 +-
 .../operators/base/GroupReduceOperatorTest.java |  17 +-
 .../base/InnerJoinOperatorBaseTest.java         |  24 +-
 .../operators/base/ReduceOperatorTest.java      |  23 +-
 .../dropwizard/ScheduledDropwizardReporter.java | 104 +++--
 .../flink/dropwizard/metrics/GaugeWrapper.java  |   8 +
 .../flink/metrics/ganglia/GangliaReporter.java  |  79 ++++
 .../flink/metrics/graphite/GangliaReporter.java |  73 ----
 .../metrics/graphite/GraphiteReporter.java      |  17 +-
 .../flink/metrics/statsd/StatsDReporter.java    |  79 ++--
 .../flink/runtime/taskmanager/TaskManager.scala |   3 +-
 .../operators/drivers/TestTaskContext.java      |   4 +-
 .../testutils/BinaryOperatorTestBase.java       |   6 +-
 .../operators/testutils/DriverTestBase.java     |  31 +-
 .../operators/testutils/DummyEnvironment.java   |  28 +-
 .../operators/testutils/MockEnvironment.java    |   3 +-
 .../testutils/UnregisteredTaskMetricsGroup.java |  68 ++++
 ...AlignedProcessingTimeWindowOperatorTest.java |  20 +-
 ...AlignedProcessingTimeWindowOperatorTest.java |  14 +-
 .../runtime/tasks/StreamMockEnvironment.java    |   4 +-
 69 files changed, 1893 insertions(+), 1509 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-contrib/flink-storm/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/pom.xml b/flink-contrib/flink-storm/pom.xml
index a080a03..590f33d 100644
--- a/flink-contrib/flink-storm/pom.xml
+++ b/flink-contrib/flink-storm/pom.xml
@@ -35,6 +35,9 @@ under the License.
 	<packaging>jar</packaging>
 
 	<dependencies>
+		
+		<!-- core dependencies -->
+		
 		<dependency>
 			<!-- Together with the dependency management section in flink-parent, this
 			pins the Kryo version of transitive dependencies to the Flink Kryo version -->
@@ -49,14 +52,6 @@ under the License.
 		</dependency>
 
 		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-core</artifactId>
-			<version>${project.version}</version>
-			<type>test-jar</type>
-			<scope>test</scope>
-		</dependency>
-
-		<dependency>
 			<groupId>org.apache.storm</groupId>
 			<artifactId>storm-core</artifactId>
 			<version>0.9.4</version>
@@ -77,6 +72,17 @@ under the License.
 			<artifactId>guava</artifactId>
 			<version>${guava.version}</version>
 		</dependency>
+
+		<!-- test dependencies -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-runtime_2.10</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+		
 	</dependencies>
 
 	<build>

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
index cb9ac1c..5da12ef 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
@@ -32,10 +32,9 @@ import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
-import org.apache.flink.metrics.util.DummyMetricGroup;
-import org.apache.flink.metrics.util.DummyTaskMetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.taskmanager.RuntimeEnvironment;
+import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 import org.apache.flink.storm.util.AbstractTest;
 import org.apache.flink.storm.util.SplitStreamType;
 import org.apache.flink.storm.util.StormConfig;
@@ -144,7 +143,7 @@ public class BoltWrapperTest extends AbstractTest {
 		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
 		when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
 		when(taskContext.getTaskName()).thenReturn("name");
-		when(taskContext.getMetricGroup()).thenReturn(new DummyMetricGroup());
+		when(taskContext.getMetricGroup()).thenReturn(new UnregisteredMetricsGroup());
 
 		final IRichBolt bolt = mock(IRichBolt.class);
 
@@ -229,7 +228,7 @@ public class BoltWrapperTest extends AbstractTest {
 		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
 		when(taskContext.getExecutionConfig()).thenReturn(taskConfig);
 		when(taskContext.getTaskName()).thenReturn("name");
-		when(taskContext.getMetricGroup()).thenReturn(new DummyMetricGroup());
+		when(taskContext.getMetricGroup()).thenReturn(new UnregisteredMetricsGroup());
 
 		final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
 		declarer.declare(new Fields("dummy"));
@@ -294,7 +293,7 @@ public class BoltWrapperTest extends AbstractTest {
 		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
 		when(taskContext.getExecutionConfig()).thenReturn(taskConfig);
 		when(taskContext.getTaskName()).thenReturn("name");
-		when(taskContext.getMetricGroup()).thenReturn(new DummyMetricGroup());
+		when(taskContext.getMetricGroup()).thenReturn(new UnregisteredMetricsGroup());
 
 		final IRichBolt bolt = mock(IRichBolt.class);
 		BoltWrapper<Object, Object> wrapper = new BoltWrapper<Object, Object>(bolt);
@@ -367,7 +366,7 @@ public class BoltWrapperTest extends AbstractTest {
 		Environment env = mock(Environment.class);
 		when(env.getTaskInfo()).thenReturn(new TaskInfo("Mock Task", 0, 1, 0));
 		when(env.getUserClassLoader()).thenReturn(BoltWrapperTest.class.getClassLoader());
-		when(env.getMetricGroup()).thenReturn(new DummyTaskMetricGroup());
+		when(env.getMetricGroup()).thenReturn(new UnregisteredTaskMetricsGroup());
 
 		StreamTask<?, ?> mockTask = mock(StreamTask.class);
 		when(mockTask.getCheckpointLock()).thenReturn(new Object());

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
index 97f0c5e..913b205 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
@@ -36,7 +36,6 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.accumulators.Accumulator;
@@ -62,7 +61,7 @@ import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.local.LocalFileSystem;
 import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.groups.NonRegisteringMetricsGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.Visitor;
 
@@ -72,8 +71,6 @@ import org.apache.flink.util.Visitor;
 @Internal
 public class CollectionExecutor {
 	
-	private static final boolean DEFAULT_MUTABLE_OBJECT_SAFE_MODE = true;
-	
 	private final Map<Operator<?>, List<?>> intermediateResults;
 	
 	private final Map<String, Accumulator<?, ?>> accumulators;
@@ -109,11 +106,6 @@ public class CollectionExecutor {
 	
 	public JobExecutionResult execute(Plan program) throws Exception {
 		long startTime = System.currentTimeMillis();
-		
-		JobID jobID = program.getJobId();
-		if (jobID == null) {
-			jobID = new JobID();
-		}
 
 		initCache(program.getCachedFiles());
 		Collection<? extends GenericDataSinkBase<?>> sinks = program.getDataSinks();
@@ -194,7 +186,7 @@ public class CollectionExecutor {
 		TaskInfo taskInfo = new TaskInfo(typedSink.getName(), 0, 1, 0);
 		RuntimeUDFContext ctx;
 
-		MetricGroup metrics = NonRegisteringMetricsGroup.get();
+		MetricGroup metrics = new UnregisteredMetricsGroup();
 			
 		if (RichOutputFormat.class.isAssignableFrom(typedSink.getUserCodeWrapper().getUserCodeClass())) {
 			ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics) :
@@ -215,7 +207,7 @@ public class CollectionExecutor {
 		
 		RuntimeUDFContext ctx;
 
-		MetricGroup metrics = NonRegisteringMetricsGroup.get();
+		MetricGroup metrics = new UnregisteredMetricsGroup();
 		if (RichInputFormat.class.isAssignableFrom(typedSource.getUserCodeWrapper().getUserCodeClass())) {
 			ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics) :
 					new IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics);
@@ -241,7 +233,7 @@ public class CollectionExecutor {
 		TaskInfo taskInfo = new TaskInfo(typedOp.getName(), 0, 1, 0);
 		RuntimeUDFContext ctx;
 
-		MetricGroup metrics = NonRegisteringMetricsGroup.get();
+		MetricGroup metrics = new UnregisteredMetricsGroup();
 		if (RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass())) {
 			ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics) :
 					new IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics);
@@ -253,10 +245,8 @@ public class CollectionExecutor {
 		} else {
 			ctx = null;
 		}
-		
-		List<OUT> result = typedOp.executeOnCollections(inputData, ctx, executionConfig);
-		
-		return result;
+
+		return typedOp.executeOnCollections(inputData, ctx, executionConfig);
 	}
 	
 	private <IN1, IN2, OUT> List<OUT> executeBinaryOperator(DualInputOperator<?, ?, ?, ?> operator, int superStep) throws Exception {
@@ -283,7 +273,7 @@ public class CollectionExecutor {
 		TaskInfo taskInfo = new TaskInfo(typedOp.getName(), 0, 1, 0);
 		RuntimeUDFContext ctx;
 
-		MetricGroup metrics = NonRegisteringMetricsGroup.get();
+		MetricGroup metrics = new UnregisteredMetricsGroup();
 	
 		if (RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass())) {
 			ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics) :
@@ -296,10 +286,8 @@ public class CollectionExecutor {
 		} else {
 			ctx = null;
 		}
-		
-		List<OUT> result = typedOp.executeOnCollections(inputData1, inputData2, ctx, executionConfig);
-		
-		return result;
+
+		return typedOp.executeOnCollections(inputData1, inputData2, ctx, executionConfig);
 	}
 	
 	@SuppressWarnings("unchecked")
@@ -448,7 +436,7 @@ public class CollectionExecutor {
 				solutionMap.put(wrapper, delta);
 			}
 
-			currentWorkset = (List<?>) execute(iteration.getNextWorkset(), superstep);
+			currentWorkset = execute(iteration.getNextWorkset(), superstep);
 
 			if (currentWorkset.isEmpty()) {
 				break;

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/metrics/Counter.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/Counter.java b/flink-core/src/main/java/org/apache/flink/metrics/Counter.java
index 201a613..acc37cf 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/Counter.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/Counter.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.metrics;
 
 import org.apache.flink.annotation.PublicEvolving;
@@ -24,9 +25,9 @@ import org.apache.flink.annotation.PublicEvolving;
  */
 @PublicEvolving
 public final class Counter implements Metric {
-	
+
 	private long count;
-	
+
 	/**
 	 * Increment the current count by 1.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java b/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java
index cca105e..aad8deb 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.metrics;
 
 import org.apache.flink.annotation.PublicEvolving;
@@ -24,7 +25,7 @@ import org.apache.flink.annotation.PublicEvolving;
  */
 @PublicEvolving
 public abstract class Gauge<T> implements Metric {
-	
+
 	/**
 	 * Calculates and returns the measured value.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/metrics/Metric.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/Metric.java b/flink-core/src/main/java/org/apache/flink/metrics/Metric.java
index 11cfcc6..8054de0 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/Metric.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/Metric.java
@@ -15,12 +15,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.metrics;
 
 import org.apache.flink.annotation.PublicEvolving;
 
 /**
- * Common interface for all metrics.
+ * Common super interface for all metrics.
  */
 @PublicEvolving
 public interface Metric {

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java b/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
index 7e06217..e5d3477 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
@@ -21,7 +21,8 @@ package org.apache.flink.metrics;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.groups.AbstractMetricGroup;
-import org.apache.flink.metrics.groups.Scope;
+import org.apache.flink.metrics.groups.scope.ScopeFormat;
+import org.apache.flink.metrics.groups.scope.ScopeFormats;
 import org.apache.flink.metrics.reporter.JMXReporter;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.metrics.reporter.Scheduled;
@@ -32,11 +33,6 @@ import org.slf4j.LoggerFactory;
 import java.util.TimerTask;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.flink.metrics.groups.JobMetricGroup.DEFAULT_SCOPE_JOB;
-import static org.apache.flink.metrics.groups.OperatorMetricGroup.DEFAULT_SCOPE_OPERATOR;
-import static org.apache.flink.metrics.groups.TaskManagerMetricGroup.DEFAULT_SCOPE_TM;
-import static org.apache.flink.metrics.groups.TaskMetricGroup.DEFAULT_SCOPE_TASK;
-
 /**
  * A MetricRegistry keeps track of all registered {@link Metric Metrics}. It serves as the
  * connection between {@link MetricGroup MetricGroups} and {@link MetricReporter MetricReporters}.
@@ -52,10 +48,10 @@ public class MetricRegistry {
 	public static final String KEY_METRICS_REPORTER_ARGUMENTS = "metrics.reporter.arguments";
 	public static final String KEY_METRICS_REPORTER_INTERVAL = "metrics.reporter.interval";
 
-	public static final String KEY_METRICS_SCOPE_NAMING_TM = "metrics.scope.tm";
-	public static final String KEY_METRICS_SCOPE_NAMING_JOB = "metrics.scope.job";
-	public static final String KEY_METRICS_SCOPE_NAMING_TASK = "metrics.scope.task";
-	public static final String KEY_METRICS_SCOPE_NAMING_OPERATOR = "metrics.scope.operator";
+	public static final String KEY_METRICS_SCOPE_NAMING_TM = "metrics.scopeName.tm";
+	public static final String KEY_METRICS_SCOPE_NAMING_JOB = "metrics.scopeName.job";
+	public static final String KEY_METRICS_SCOPE_NAMING_TASK = "metrics.scopeName.task";
+	public static final String KEY_METRICS_SCOPE_NAMING_OPERATOR = "metrics.scopeName.operator";
 
 	// ------------------------------------------------------------------------
 	//  configuration keys
@@ -66,23 +62,22 @@ public class MetricRegistry {
 	private final MetricReporter reporter;
 	private final java.util.Timer timer;
 
-	private final Scope.ScopeFormat scopeConfig;
+	private final ScopeFormats scopeFormats;
 
 	/**
 	 * Creates a new MetricRegistry and starts the configured reporter.
 	 */
 	public MetricRegistry(Configuration config) {
-		// first parse the scope formats, these are needed for all reporters
-		
-		Scope.ScopeFormat scopeFormat;
+		// first parse the scopeName formats, these are needed for all reporters
+		ScopeFormats scopeFormats;
 		try {
-			scopeFormat = createScopeConfig(config);
+			scopeFormats = createScopeConfig(config);
 		}
 		catch (Exception e) {
-			scopeFormat = createScopeConfig(new Configuration());
-			LOG.warn("Failed to parse scope format, using default scope formats", e);
+			LOG.warn("Failed to parse scopeName format, using default scopeName formats", e);
+			scopeFormats = new ScopeFormats();
 		}
-		this.scopeConfig = scopeFormat;
+		this.scopeFormats = scopeFormats;
 
 		// second, instantiate any custom configured reporters
 		
@@ -157,8 +152,8 @@ public class MetricRegistry {
 		}
 	}
 
-	public Scope.ScopeFormat getScopeConfig() {
-		return this.scopeConfig;
+	public ScopeFormats getScopeFormats() {
+		return scopeFormats;
 	}
 
 	// ------------------------------------------------------------------------
@@ -166,35 +161,36 @@ public class MetricRegistry {
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Registers a new {@link org.apache.flink.metrics.Metric} with this registry.
+	 * Registers a new {@link Metric} with this registry.
 	 *
-	 * @param metric metric to register
-	 * @param name   name of the metric
-	 * @param parent group that contains the metric
+	 * @param metric      the metric that was added
+	 * @param metricName  the name of the metric
+	 * @param group       the group that contains the metric
 	 */
-	public void register(Metric metric, String name, AbstractMetricGroup parent) {
-		String metricName = reporter.generateName(name, parent.generateScope());
-		this.reporter.notifyOfAddedMetric(metric, metricName);
+	public void register(Metric metric, String metricName, AbstractMetricGroup group) {
+		if (reporter != null) {
+			reporter.notifyOfAddedMetric(metric, metricName, group);
+		}
 	}
 
 	/**
 	 * Un-registers the given {@link org.apache.flink.metrics.Metric} with this registry.
 	 *
-	 * @param metric metric to un-register
-	 * @param name   name of the metric
-	 * @param parent group that contains the metric
+	 * @param metric      the metric that should be removed
+	 * @param metricName  the name of the metric
+	 * @param group       the group that contains the metric
 	 */
-	public void unregister(Metric metric, String name, AbstractMetricGroup parent) {
-		String metricName = reporter.generateName(name, parent.generateScope());
-		
-		this.reporter.notifyOfRemovedMetric(metric, metricName);
+	public void unregister(Metric metric, String metricName, AbstractMetricGroup group) {
+		if (reporter != null) {
+			reporter.notifyOfRemovedMetric(metric, metricName, group);
+		}
 	}
 
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------
 	
-	private static Configuration createReporterConfig(Configuration config, TimeUnit timeunit, long period) {
+	static Configuration createReporterConfig(Configuration config, TimeUnit timeunit, long period) {
 		Configuration reporterConfig = new Configuration();
 		reporterConfig.setLong("period", period);
 		reporterConfig.setString("timeunit", timeunit.name());
@@ -208,19 +204,17 @@ public class MetricRegistry {
 		return reporterConfig;
 	}
 
-	private static Scope.ScopeFormat createScopeConfig(Configuration config) {
-		String tmFormat = config.getString(KEY_METRICS_SCOPE_NAMING_TM, DEFAULT_SCOPE_TM);
-		String jobFormat = config.getString(KEY_METRICS_SCOPE_NAMING_JOB, DEFAULT_SCOPE_JOB);
-		String taskFormat = config.getString(KEY_METRICS_SCOPE_NAMING_TASK, DEFAULT_SCOPE_TASK);
-		String operatorFormat = config.getString(KEY_METRICS_SCOPE_NAMING_OPERATOR, DEFAULT_SCOPE_OPERATOR);
-
-
-		Scope.ScopeFormat format = new Scope.ScopeFormat();
-		format.setTaskManagerFormat(tmFormat);
-		format.setJobFormat(jobFormat);
-		format.setTaskFormat(taskFormat);
-		format.setOperatorFormat(operatorFormat);
-		return format;
+	static ScopeFormats createScopeConfig(Configuration config) {
+		String tmFormat = config.getString(
+				KEY_METRICS_SCOPE_NAMING_TM, ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_GROUP);
+		String jobFormat = config.getString(
+				KEY_METRICS_SCOPE_NAMING_JOB, ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP);
+		String taskFormat = config.getString(
+				KEY_METRICS_SCOPE_NAMING_TASK, ScopeFormat.DEFAULT_SCOPE_TASK_GROUP);
+		String operatorFormat = config.getString(
+				KEY_METRICS_SCOPE_NAMING_OPERATOR, ScopeFormat.DEFAULT_SCOPE_OPERATOR_GROUP);
+		
+		return new ScopeFormats(tmFormat, jobFormat, taskFormat, operatorFormat);
 	}
 
 	// ------------------------------------------------------------------------
@@ -236,7 +230,7 @@ public class MetricRegistry {
 	 * which acts as a fail-safe to stop the timer thread and prevents resource leaks.
 	 */
 	private static final class ReporterTask extends TimerTask {
-		
+
 		private final Scheduled reporter;
 
 		private ReporterTask(Scheduled reporter) {

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
index cad241d..032fa04 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
@@ -25,14 +25,12 @@ import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.MetricRegistry;
 
+import org.apache.flink.metrics.groups.scope.ScopeFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -55,31 +53,58 @@ public abstract class AbstractMetricGroup implements MetricGroup {
 	/** shared logger */
 	private static final Logger LOG = LoggerFactory.getLogger(MetricGroup.class);
 
-	private static final String METRIC_NAME_REGEX = "[a-zA-Z0-9]*";
-	
-	/** The pattern that metric and group names have to match */
-	private static final Pattern METRIC_NAME_PATTERN = Pattern.compile(METRIC_NAME_REGEX);
-
 	// ------------------------------------------------------------------------
 
 	/** The registry that this metrics group belongs to */
 	protected final MetricRegistry registry;
 
 	/** All metrics that are directly contained in this group */
-	protected final Map<String, Metric> metrics = new HashMap<>();
+	private final Map<String, Metric> metrics = new HashMap<>();
 
 	/** All metric subgroups of this group */
-	protected final Map<String, MetricGroup> groups = new HashMap<>();
+	private final Map<String, MetricGroup> groups = new HashMap<>();
+
+	/** The metrics scope represented by this group.
+	 *  For example ["host-7", "taskmanager-2", "window_word_count", "my-mapper" ]. */
+	private final String[] scopeComponents;
+
+	/** The metrics scope represented by this group, as a concatenated string, lazily computed.
+	 * For example: "host-7.taskmanager-2.window_word_count.my-mapper" */
+	private String scopeString;
 
 	/** Flag indicating whether this group has been closed */
 	private volatile boolean closed;
 
 	// ------------------------------------------------------------------------
-	
-	public AbstractMetricGroup(MetricRegistry registry) {
+
+	public AbstractMetricGroup(MetricRegistry registry, String[] scope) {
 		this.registry = checkNotNull(registry);
+		this.scopeComponents = checkNotNull(scope);
 	}
 
+	/**
+	 * Gets the scope as an array of the scope components, for example
+	 * {@code ["host-7", "taskmanager-2", "window_word_count", "my-mapper"]}
+	 * 
+	 * @see #getScopeString() 
+	 */
+	public String[] getScopeComponents() {
+		return scopeComponents;
+	}
+
+	/**
+	 * Gets the scope as a single delimited string, for example
+	 * {@code "host-7.taskmanager-2.window_word_count.my-mapper"}
+	 *
+	 * @see #getScopeComponents()
+	 */
+	public String getScopeString() {
+		if (scopeString == null) {
+			scopeString = ScopeFormat.concat(scopeComponents);
+		}
+		return scopeString;
+	}
+	
 	// ------------------------------------------------------------------------
 	//  Closing
 	// ------------------------------------------------------------------------
@@ -111,25 +136,6 @@ public abstract class AbstractMetricGroup implements MetricGroup {
 	}
 
 	// -----------------------------------------------------------------------------------------------------------------
-	//  Scope
-	// -----------------------------------------------------------------------------------------------------------------
-
-	/**
-	 * Generates the full scope based on the default/configured format that applies to all metrics within this group.
-	 *
-	 * @return generated scope
-	 */
-	public abstract List<String> generateScope();
-
-	/**
-	 * Generates the full scope based on the given format that applies to all metrics within this group.
-	 *
-	 * @param format format string
-	 * @return generated scope
-	 */
-	public abstract List<String> generateScope(Scope.ScopeFormat format);
-
-	// -----------------------------------------------------------------------------------------------------------------
 	//  Metrics
 	// -----------------------------------------------------------------------------------------------------------------
 
@@ -164,11 +170,8 @@ public abstract class AbstractMetricGroup implements MetricGroup {
 	 * @param metric the metric to register
 	 */
 	protected void addMetric(String name, Metric metric) {
-		Matcher nameMatcher = METRIC_NAME_PATTERN.matcher(name);
-		if (!nameMatcher.matches()) {
-			throw new IllegalArgumentException("Metric names may not contain special characters or spaces. " +
-					"Allowed is: " + METRIC_NAME_REGEX);
-		}
+		// early reject names that will later cause issues
+		checkAllowedCharacters(name);
 
 		// add the metric only if the group is still open
 		synchronized (this) {
@@ -185,7 +188,7 @@ public abstract class AbstractMetricGroup implements MetricGroup {
 						// we warn here, rather than failing, because metrics are tools that should not fail the
 						// program when used incorrectly
 						LOG.warn("Name collision: Adding a metric with the same name as a metric subgroup: '" +
-								name + "'. Metric might not get properly reported. (" + generateScope() + ')');
+								name + "'. Metric might not get properly reported. (" + scopeString + ')');
 					}
 
 					registry.register(metric, name, this);
@@ -197,7 +200,7 @@ public abstract class AbstractMetricGroup implements MetricGroup {
 					// we warn here, rather than failing, because metrics are tools that should not fail the
 					// program when used incorrectly
 					LOG.warn("Name collision: Group already contains a Metric with the name '" +
-							name + "'. Metric will not be reported. (" + generateScope() + ')');
+							name + "'. Metric will not be reported. (" + scopeString + ')');
 				}
 			}
 		}
@@ -221,7 +224,7 @@ public abstract class AbstractMetricGroup implements MetricGroup {
 				// program when used incorrectly
 				if (metrics.containsKey(name)) {
 					LOG.warn("Name collision: Adding a metric subgroup with the same name as an existing metric: '" +
-							name + "'. Metric might not get properly reported. (" + generateScope() + ')');
+							name + "'. Metric might not get properly reported. (" + scopeString + ')');
 				}
 
 				MetricGroup newGroup = new GenericMetricGroup(registry, this, name);
@@ -243,4 +246,21 @@ public abstract class AbstractMetricGroup implements MetricGroup {
 			}
 		}
 	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Fast implementation to check if a string has only alphanumeric characters.
+	 * Compared to a regular expression, this is about an order of magnitude faster.
+	 */
+	private static void checkAllowedCharacters(String name) {
+		for (int i = 0; i < name.length(); i++) {
+			final char c = name.charAt(i);
+			if (c < 0x30 || (c >= 0x3a && c <= 0x40) || (c > 0x5a && c <= 0x60) || c > 0x7a) {
+				throw new IllegalArgumentException("Metric names may only contain [a-zA-Z0-9].");
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/metrics/groups/ComponentMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/ComponentMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/ComponentMetricGroup.java
index c68cdc1..518d940 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/groups/ComponentMetricGroup.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/ComponentMetricGroup.java
@@ -21,46 +21,33 @@ package org.apache.flink.metrics.groups;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.metrics.MetricRegistry;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.flink.metrics.groups.Scope.SCOPE_WILDCARD;
-
 /**
  * Abstract {@link org.apache.flink.metrics.MetricGroup} for system components (e.g., 
  * TaskManager, Job, Task, Operator).
  * 
- * <p>The components metric groups contain functionality to define alias names for
- * the component identifier. For example, while Tasks are registered under a Task Attempt ID,
- * the metrics name scope may use the task name instead. Using these aliases makes
- * Metric scope names stable across jobs, recovery attempts, etc.
+ * <p>Usually, the scope of metrics is simply the hierarchy of the containing groups. For example
+ * the Metric {@code "MyMetric"} in group {@code "B"} nested in group {@code "A"} would have a
+ * fully scoped name of {@code "A.B.MyMetric"}, with {@code "A.B"} being the Metric's scope.
+ *
+ * <p>Component groups, however, have configurable scopes. This allow users to include or exclude
+ * certain identifiers from the scope. The scope for metrics belonging to the "Task"
+ * group could for example include the task attempt number (more fine grained identification), or
+ * exclude it (for continuity of the namespace across failure and recovery).
  */
 @Internal
 public abstract class ComponentMetricGroup extends AbstractMetricGroup {
 
-	private final ComponentMetricGroup parent;
-
-	private final String format;
-	
-	// Map: scope variable -> specific value
-	protected final Map<String, String> formats;
-
-	// ------------------------------------------------------------------------
-
 	/**
 	 * Creates a new ComponentMetricGroup.
 	 *
-	 * @param registry    registry to register new metrics with
-	 * @param parentGroup parent group, may be null
-	 * @param scopeFormat default format string
+	 * @param registry     registry to register new metrics with
+	 * @param scope        the scope of the group
 	 */
-	public ComponentMetricGroup(MetricRegistry registry, ComponentMetricGroup parentGroup, String scopeFormat) {
-		super(registry);
-		this.formats = new HashMap<>();
-		this.parent = parentGroup;
-		this.format = scopeFormat;
+	public ComponentMetricGroup(
+			MetricRegistry registry,
+			String[] scope) {
+
+		super(registry, scope);
 	}
 
 	/**
@@ -87,71 +74,5 @@ public abstract class ComponentMetricGroup extends AbstractMetricGroup {
 	//  sub components
 	// ------------------------------------------------------------------------
 
-	protected ComponentMetricGroup parent() {
-		return parent;
-	}
-
 	protected abstract Iterable<? extends ComponentMetricGroup> subComponents();
-
-	// ------------------------------------------------------------------------
-	//  scope format
-	// ------------------------------------------------------------------------
-
-	protected abstract String getScopeFormat(Scope.ScopeFormat format);
-
-	@Override
-	public List<String> generateScope() {
-		return generateScope(format);
-	}
-
-	@Override
-	public List<String> generateScope(Scope.ScopeFormat format) {
-		return generateScope(getScopeFormat(format));
-	}
-
-	private List<String> generateScope(String format) {
-		String[] components = Scope.split(format);
-
-		List<String> scope = new ArrayList<>();
-		if (components[0].equals(SCOPE_WILDCARD)) {
-			if (this.parent != null) {
-				scope = this.parent.generateScope();
-			}
-			this.replaceFormats(components);
-			addToList(scope, components, 1);
-		} else {
-			if (this.parent != null) {
-				this.parent.replaceFormats(components);
-			}
-			this.replaceFormats(components);
-			addToList(scope, components, 0);
-		}
-		return scope;
-	}
-
-	private void replaceFormats(String[] components) {
-		if (this.parent != null) {
-			this.parent.replaceFormats(components);
-		}
-		for (int x = 0; x < components.length; x++) {
-			if (components[x].startsWith("<")) {
-				if (this.formats.containsKey(components[x])) {
-					components[x] = this.formats.get(components[x]);
-				}
-			}
-		}
-	}
-
-	/**
-	 * Adds all elements from the given array, starting from the given index, to the given list.
-	 *
-	 * @param list       destination
-	 * @param array      source
-	 * @param startIndex array index to start from
-	 */
-	private static void addToList(List<String> list, String[] array, int startIndex) {
-		for (int x = startIndex; x < array.length; x++) {
-			list.add(array[x]);
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/metrics/groups/GenericMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/GenericMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/GenericMetricGroup.java
index eedb0fa..ddcd73b 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/groups/GenericMetricGroup.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/GenericMetricGroup.java
@@ -15,44 +15,35 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.metrics.groups;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.metrics.MetricRegistry;
 
-import java.util.List;
-
 /**
- * A simple named {@link org.apache.flink.metrics.MetricGroup} with no special properties.
+ * A simple named {@link org.apache.flink.metrics.MetricGroup} that is used to hold
+ * subgroups of metrics.
  */
 @Internal
 public class GenericMetricGroup extends AbstractMetricGroup {
-	
-	private final AbstractMetricGroup parent;
-
-	private final String name;
-
-	protected GenericMetricGroup(MetricRegistry registry, AbstractMetricGroup parent, int name) {
-		this(registry, parent, String.valueOf(name));
-	}
-
-	protected GenericMetricGroup(MetricRegistry registry, AbstractMetricGroup parent, String name) {
-		super(registry);
-		this.parent = parent;
-		this.name = name;
-	}
 
-	@Override
-	public List<String> generateScope() {
-		List<String> scope = parent.generateScope();
-		scope.add(name);
-		return scope;
+	public GenericMetricGroup(MetricRegistry registry, AbstractMetricGroup parent, String name) {
+		super(registry, makeScopeComponents(parent, name));
 	}
 
-	@Override
-	public List<String> generateScope(Scope.ScopeFormat format) {
-		List<String> scope = parent.generateScope(format);
-		scope.add(name);
-		return scope;
+	// ------------------------------------------------------------------------
+
+	private static String[] makeScopeComponents(AbstractMetricGroup parent, String name) {
+		if (parent != null) {
+			String[] parentComponents = parent.getScopeComponents();
+			if (parentComponents != null && parentComponents.length > 0) {
+				String[] parts = new String[parentComponents.length + 1];
+				System.arraycopy(parentComponents, 0, parts, 0, parentComponents.length);
+				parts[parts.length - 1] = name;
+				return parts;
+			}
+		}
+		return new String[] { name };
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java
index dea5650..b34c844 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java
@@ -15,19 +15,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.metrics.groups;
 
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.MetricRegistry;
 
-import java.util.List;
-
 /**
  * Special {@link org.apache.flink.metrics.MetricGroup} that contains shareable pre-defined IO-related metrics.
  */
 public class IOMetricGroup extends AbstractMetricGroup {
-	
-	private final TaskMetricGroup parent;
 
 	private final Counter numBytesIn;
 	private final Counter numBytesOut;
@@ -35,37 +32,27 @@ public class IOMetricGroup extends AbstractMetricGroup {
 	private final Counter numRecordsOut;
 
 	public IOMetricGroup(MetricRegistry registry, TaskMetricGroup parent) {
-		super(registry);
-		this.parent = parent;
+		super(registry, parent.getScopeComponents());
+
 		this.numBytesIn = parent.counter("numBytesIn");
 		this.numBytesOut = parent.counter("numBytesOut");
 		this.numRecordsIn = parent.counter("numRecordsIn");
 		this.numRecordsOut = parent.counter("numRecordsOut");
 	}
 
-	@Override
-	public List<String> generateScope() {
-		return parent.generateScope();
-	}
-
-	@Override
-	public List<String> generateScope(Scope.ScopeFormat format) {
-		return parent.generateScope(format);
-	}
-
 	public Counter getBytesInCounter() {
-		return this.numBytesIn;
+		return numBytesIn;
 	}
 
 	public Counter getBytesOutCounter() {
-		return this.numBytesOut;
+		return numBytesOut;
 	}
 
 	public Counter getRecordsInCounter() {
-		return this.numRecordsIn;
+		return numRecordsIn;
 	}
 
 	public Counter getRecordsOutCounter() {
-		return this.numRecordsOut;
+		return numRecordsOut;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/metrics/groups/JobMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/JobMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/JobMetricGroup.java
index f4f634a..f816278 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/groups/JobMetricGroup.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/JobMetricGroup.java
@@ -21,57 +21,92 @@ package org.apache.flink.metrics.groups;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.metrics.MetricRegistry;
+import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerJobScopeFormat;
 import org.apache.flink.util.AbstractID;
 
+import javax.annotation.Nullable;
 import java.util.HashMap;
 import java.util.Map;
 
-import static org.apache.flink.metrics.groups.TaskManagerMetricGroup.DEFAULT_SCOPE_TM;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * Special {@link org.apache.flink.metrics.MetricGroup} representing a Job.
+ * Special {@link org.apache.flink.metrics.MetricGroup} representing everything belonging to
+ * a specific job, running on the TaskManager.
  * 
- * <p>Contains extra logic for adding tasks.
+ * <p>Contains extra logic for adding Tasks ({@link TaskMetricGroup}).
  */
 @Internal
 public class JobMetricGroup extends ComponentMetricGroup {
 
-	public static final String SCOPE_JOB_DESCRIPTOR = "job";
-	public static final String SCOPE_JOB_ID = Scope.format("job_id");
-	public static final String SCOPE_JOB_NAME = Scope.format("job_name");
-	public static final String DEFAULT_SCOPE_JOB_COMPONENT = Scope.concat(SCOPE_JOB_NAME);
-	public static final String DEFAULT_SCOPE_JOB = Scope.concat(DEFAULT_SCOPE_TM, DEFAULT_SCOPE_JOB_COMPONENT);
-
-	// ------------------------------------------------------------------------
+	/** The metrics group that contains this group */
+	private final TaskManagerMetricGroup parent;
 
 	/** Map from execution attempt ID (task identifier) to task metrics */
 	private final Map<AbstractID, TaskMetricGroup> tasks = new HashMap<>();
-	
+
+	/** The ID of the job represented by this metrics group */
 	private final JobID jobId;
 
+	/** The name of the job represented by this metrics group */
+	@Nullable
+	private final String jobName;
+
 	// ------------------------------------------------------------------------
 
-	public JobMetricGroup(MetricRegistry registry, TaskManagerMetricGroup taskManager, JobID jobId, String jobName) {
-		super(registry, taskManager, registry.getScopeConfig().getJobFormat());
+	public JobMetricGroup(
+			MetricRegistry registry,
+			TaskManagerMetricGroup parent,
+			JobID jobId,
+			@Nullable String jobName) {
 		
+		this(registry, checkNotNull(parent), registry.getScopeFormats().getJobFormat(), jobId, jobName);
+	}
+
+	public JobMetricGroup(
+			MetricRegistry registry,
+			TaskManagerMetricGroup parent,
+			TaskManagerJobScopeFormat scopeFormat, 
+			JobID jobId,
+			@Nullable String jobName) {
+
+		super(registry, scopeFormat.formatScope(parent, jobId, jobName));
+
+		this.parent = checkNotNull(parent);
 		this.jobId = checkNotNull(jobId);
-		this.formats.put(SCOPE_JOB_ID, jobId.toString());
-		this.formats.put(SCOPE_JOB_NAME, checkNotNull(jobName));
+		this.jobName = jobName;
+	}
+
+	public final TaskManagerMetricGroup parent() {
+		return parent;
+	}
+
+	public JobID jobId() {
+		return jobId;
+	}
+
+	@Nullable
+	public String jobName() {
+		return jobName;
 	}
 
 	// ------------------------------------------------------------------------
 	//  adding / removing tasks
 	// ------------------------------------------------------------------------
 
-	public TaskMetricGroup addTask(AbstractID vertexId, AbstractID executionId, int subtaskIndex, String name) {
-		checkNotNull(vertexId);
+	public TaskMetricGroup addTask(
+			AbstractID vertexId,
+			AbstractID executionId,
+			String taskName,
+			int subtaskIndex,
+			int attemptNumber) {
+		
 		checkNotNull(executionId);
-		checkNotNull(name);
 
 		synchronized (this) {
 			if (!isClosed()) {
-				TaskMetricGroup task = new TaskMetricGroup(registry, this, vertexId, executionId, subtaskIndex, name);
+				TaskMetricGroup task = new TaskMetricGroup(registry, this, 
+						vertexId, executionId, taskName, subtaskIndex, attemptNumber);
 				tasks.put(executionId, task);
 				return task;
 			} else {
@@ -82,7 +117,7 @@ public class JobMetricGroup extends ComponentMetricGroup {
 
 	public void removeTaskMetricGroup(AbstractID executionId) {
 		checkNotNull(executionId);
-		
+
 		boolean removeFromParent = false;
 		synchronized (this) {
 			if (!isClosed() && tasks.remove(executionId) != null && tasks.isEmpty()) {
@@ -91,25 +126,16 @@ public class JobMetricGroup extends ComponentMetricGroup {
 				close();
 			}
 		}
-		
+
 		// IMPORTANT: removing from the parent must happen while holding the this group's lock,
 		//      because it would violate the "first parent then subgroup" lock acquisition order
 		if (removeFromParent) {
-			((TaskManagerMetricGroup) parent()).removeJobMetricsGroup(jobId, this);
+			parent.removeJobMetricsGroup(jobId, this);
 		}
 	}
 
-	// ------------------------------------------------------------------------
-	//  component group behavior
-	// ------------------------------------------------------------------------
-	
 	@Override
 	protected Iterable<? extends ComponentMetricGroup> subComponents() {
 		return tasks.values();
 	}
-
-	@Override
-	protected String getScopeFormat(Scope.ScopeFormat format) {
-		return format.getJobFormat();
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/metrics/groups/NonRegisteringMetricsGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/NonRegisteringMetricsGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/NonRegisteringMetricsGroup.java
deleted file mode 100644
index 1bfcfe3..0000000
--- a/flink-core/src/main/java/org/apache/flink/metrics/groups/NonRegisteringMetricsGroup.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics.groups;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.MetricGroup;
-
-/**
- * Metrics group that does not register any metrics.
- */
-@Internal
-public class NonRegisteringMetricsGroup implements MetricGroup {
-
-	// ------------------------------------------------------------------------
-	//  singleton
-	// ------------------------------------------------------------------------
-
-	private static final NonRegisteringMetricsGroup INSTANCE = new NonRegisteringMetricsGroup();
-
-	public static NonRegisteringMetricsGroup get() {
-		return INSTANCE;
-	}
-
-	/** Private constructor to prevent instantiation */
-	private NonRegisteringMetricsGroup() {}
-
-	// ------------------------------------------------------------------------
-	//  metrics group
-	// ------------------------------------------------------------------------
-
-	@Override
-	public void close() {}
-
-	@Override
-	public boolean isClosed() {
-		return false;
-	}
-
-	@Override
-	public Counter counter(int name) {
-		return new Counter();
-	}
-
-	@Override
-	public Counter counter(String name) {
-		return new Counter();
-	}
-
-	@Override
-	public <T> Gauge<T> gauge(int name, Gauge<T> gauge) {
-		return gauge;
-	}
-
-	@Override
-	public <T> Gauge<T> gauge(String name, Gauge<T> gauge) {
-		return gauge;
-	}
-
-	
-	@Override
-	public MetricGroup addGroup(int name) {
-		return addGroup(String.valueOf(name));
-	}
-
-	@Override
-	public MetricGroup addGroup(String name) {
-		return new NonRegisteringMetricsGroup();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/metrics/groups/OperatorMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/OperatorMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/OperatorMetricGroup.java
index 390b55b..6db79ab 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/groups/OperatorMetricGroup.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/OperatorMetricGroup.java
@@ -15,14 +15,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.metrics.groups;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.metrics.MetricRegistry;
+import org.apache.flink.metrics.groups.scope.ScopeFormat.OperatorScopeFormat;
 
 import java.util.Collections;
 
-import static org.apache.flink.metrics.groups.JobMetricGroup.DEFAULT_SCOPE_JOB;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Special {@link org.apache.flink.metrics.MetricGroup} representing an Operator.
@@ -30,26 +32,29 @@ import static org.apache.flink.metrics.groups.JobMetricGroup.DEFAULT_SCOPE_JOB;
 @Internal
 public class OperatorMetricGroup extends ComponentMetricGroup {
 
-	public static final String SCOPE_OPERATOR_DESCRIPTOR = "operator";
-	public static final String SCOPE_OPERATOR_NAME = Scope.format("operator_name");
-	public static final String SCOPE_OPERATOR_SUBTASK_INDEX = Scope.format("subtask_index");
-	public static final String DEFAULT_SCOPE_OPERATOR_COMPONENT = Scope.concat(SCOPE_OPERATOR_NAME, SCOPE_OPERATOR_SUBTASK_INDEX);
-	public static final String DEFAULT_SCOPE_OPERATOR = Scope.concat(DEFAULT_SCOPE_JOB, DEFAULT_SCOPE_OPERATOR_COMPONENT);
-
-	protected OperatorMetricGroup(MetricRegistry registry, TaskMetricGroup task, String name, int subTaskIndex) {
-		super(registry, task, registry.getScopeConfig().getOperatorFormat());
+	/** The task metric group that contains this operator metric groups */
+	private final TaskMetricGroup parent;
 
-		this.formats.put(SCOPE_OPERATOR_NAME, name);
-		this.formats.put(SCOPE_OPERATOR_SUBTASK_INDEX, String.valueOf(subTaskIndex));
+	public OperatorMetricGroup(MetricRegistry registry, TaskMetricGroup parent, String operatorName) {
+		this(registry, parent, registry.getScopeFormats().getOperatorFormat(), operatorName);
 	}
 
-	// ------------------------------------------------------------------------
+	public OperatorMetricGroup(
+			MetricRegistry registry,
+			TaskMetricGroup parent,
+			OperatorScopeFormat scopeFormat,
+			String operatorName) {
 
-	@Override
-	protected String getScopeFormat(Scope.ScopeFormat format) {
-		return format.getOperatorFormat();
+		super(registry, scopeFormat.formatScope(parent, operatorName));
+		this.parent = checkNotNull(parent);
 	}
 
+	// ------------------------------------------------------------------------
+	
+	public final TaskMetricGroup parent() {
+		return parent;
+	}
+	
 	@Override
 	protected Iterable<? extends ComponentMetricGroup> subComponents() {
 		return Collections.emptyList();

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/metrics/groups/Scope.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/Scope.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/Scope.java
deleted file mode 100644
index 83013e2..0000000
--- a/flink-core/src/main/java/org/apache/flink/metrics/groups/Scope.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.metrics.groups;
-
-import org.apache.flink.annotation.Internal;
-
-/**
- * This class provides utility-functions for handling scopes.
- */
-@Internal
-public class Scope {
-	public static final String SCOPE_WILDCARD = "*";
-
-	private static final String SCOPE_PREFIX = "<";
-	private static final String SCOPE_SUFFIX = ">";
-	private static final String SCOPE_SPLIT = ".";
-
-	private Scope() {
-	}
-
-	/**
-	 * Modifies the given string to resemble a scope variable.
-	 *
-	 * @param scope string to format
-	 * @return formatted string
-	 */
-	public static String format(String scope) {
-		return SCOPE_PREFIX + scope + SCOPE_SUFFIX;
-	}
-
-	/**
-	 * Joins the given components into a single scope.
-	 *
-	 * @param components components to join
-	 * @return joined scoped
-	 */
-	public static String concat(String... components) {
-		StringBuilder sb = new StringBuilder();
-		sb.append(components[0]);
-		for (int x = 1; x < components.length; x++) {
-			sb.append(SCOPE_SPLIT);
-			sb.append(components[x]);
-		}
-		return sb.toString();
-	}
-
-	/**
-	 * Splits the given scope into it's individual components.
-	 *
-	 * @param scope scope to split
-	 * @return array of components
-	 */
-	public static String[] split(String scope) {
-		return scope.split("\\" + SCOPE_SPLIT);
-	}
-
-	/**
-	 * Simple container for component scope format strings.
-	 */
-	public static class ScopeFormat {
-		
-		private String operatorFormat = OperatorMetricGroup.DEFAULT_SCOPE_OPERATOR;
-		private String taskFormat = TaskMetricGroup.DEFAULT_SCOPE_TASK;
-		private String jobFormat = JobMetricGroup.DEFAULT_SCOPE_JOB;
-		private String taskManagerFormat = TaskManagerMetricGroup.DEFAULT_SCOPE_TM;
-		
-
-		public ScopeFormat setOperatorFormat(String format) {
-			this.operatorFormat = format;
-			return this;
-		}
-
-		public ScopeFormat setTaskFormat(String format) {
-			this.taskFormat = format;
-			return this;
-		}
-
-		public ScopeFormat setJobFormat(String format) {
-			this.jobFormat = format;
-			return this;
-		}
-
-		public ScopeFormat setTaskManagerFormat(String format) {
-			this.taskManagerFormat = format;
-			return this;
-		}
-
-		public String getOperatorFormat() {
-			return this.operatorFormat;
-		}
-
-		public String getTaskFormat() {
-			return this.taskFormat;
-		}
-
-		public String getJobFormat() {
-			return this.jobFormat;
-		}
-
-		public String getTaskManagerFormat() {
-			return this.taskManagerFormat;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerMetricGroup.java
index bfb9362..3cb3936 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerMetricGroup.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerMetricGroup.java
@@ -21,13 +21,12 @@ package org.apache.flink.metrics.groups;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.metrics.MetricRegistry;
+import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerScopeFormat;
 import org.apache.flink.util.AbstractID;
 
 import java.util.HashMap;
 import java.util.Map;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 /**
  * Special {@link org.apache.flink.metrics.MetricGroup} representing a TaskManager.
  *
@@ -37,22 +36,33 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 @Internal
 public class TaskManagerMetricGroup extends ComponentMetricGroup {
 
-	public static final String SCOPE_HOST_DESCRIPTOR = "host";
-	public static final String SCOPE_TM_DESCRIPTOR = "taskmanager";
-	public static final String SCOPE_TM_HOST = Scope.format("host");
-	public static final String SCOPE_TM_ID = Scope.format("tm_id");
-	public static final String DEFAULT_SCOPE_TM_COMPONENT = Scope.concat(SCOPE_TM_HOST, "taskmanager", SCOPE_TM_ID);
-	public static final String DEFAULT_SCOPE_TM = DEFAULT_SCOPE_TM_COMPONENT;
-
-	// ------------------------------------------------------------------------
-	
 	private final Map<JobID, JobMetricGroup> jobs = new HashMap<>();
 
-	public TaskManagerMetricGroup(MetricRegistry registry, String host, String taskManagerId) {
-		super(registry, null, registry.getScopeConfig().getTaskManagerFormat());
+	private final String hostname;
+
+	private final String taskManagerId;
 
-		this.formats.put(SCOPE_TM_HOST, checkNotNull(host));
-		this.formats.put(SCOPE_TM_ID, checkNotNull(taskManagerId));
+
+	public TaskManagerMetricGroup(MetricRegistry registry, String hostname, String taskManagerId) {
+		this(registry, registry.getScopeFormats().getTaskManagerFormat(), hostname, taskManagerId);
+	}
+
+	public TaskManagerMetricGroup(
+			MetricRegistry registry,
+			TaskManagerScopeFormat scopeFormat,
+			String hostname, String taskManagerId) {
+
+		super(registry, scopeFormat.formatScope(hostname, taskManagerId));
+		this.hostname = hostname;
+		this.taskManagerId = taskManagerId;
+	}
+
+	public String hostname() {
+		return hostname;
+	}
+
+	public String taskManagerId() {
+		return taskManagerId;
 	}
 
 	// ------------------------------------------------------------------------
@@ -64,9 +74,10 @@ public class TaskManagerMetricGroup extends ComponentMetricGroup {
 			String jobName,
 			AbstractID vertexID,
 			AbstractID executionId,
+			String taskName,
 			int subtaskIndex,
-			String taskName) {
-		
+			int attemptNumber) {
+
 		// we cannot strictly lock both our map modification and the job group modification
 		// because it might lead to a deadlock
 		while (true) {
@@ -80,28 +91,30 @@ public class TaskManagerMetricGroup extends ComponentMetricGroup {
 					jobs.put(jobId, currentJobGroup);
 				}
 			}
-		
+
 			// try to add another task. this may fail if we found a pre-existing job metrics
 			// group and it is closed concurrently
-			TaskMetricGroup taskGroup = currentJobGroup.addTask(vertexID, executionId, subtaskIndex, taskName);
+			TaskMetricGroup taskGroup = currentJobGroup.addTask(
+					vertexID, executionId, taskName, subtaskIndex, attemptNumber);
+
 			if (taskGroup != null) {
 				// successfully added the next task
 				return taskGroup;
 			}
-			
+
 			// else fall through the loop
 		}
 	}
-	
+
 	public void removeJobMetricsGroup(JobID jobId, JobMetricGroup group) {
 		if (jobId == null || group == null || !group.isClosed()) {
 			return;
 		}
-		
+
 		synchronized (this) {
 			// optimistically remove the currently contained group, and check later if it was correct
 			JobMetricGroup containedGroup = jobs.remove(jobId);
-			
+
 			// check if another group was actually contained, and restore that one
 			if (containedGroup != null && containedGroup != group) {
 				jobs.put(jobId, containedGroup);
@@ -113,15 +126,6 @@ public class TaskManagerMetricGroup extends ComponentMetricGroup {
 		return jobs.size();
 	}
 
-	// ------------------------------------------------------------------------
-	//  component group behavior
-	// ------------------------------------------------------------------------
-
-	@Override
-	protected String getScopeFormat(Scope.ScopeFormat format) {
-		return format.getTaskManagerFormat();
-	}
-
 	@Override
 	protected Iterable<? extends ComponentMetricGroup> subComponents() {
 		return jobs.values();

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskMetricGroup.java
index 316c84f..784578b 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskMetricGroup.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskMetricGroup.java
@@ -20,12 +20,13 @@ package org.apache.flink.metrics.groups;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.metrics.MetricRegistry;
+import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskScopeFormat;
 import org.apache.flink.util.AbstractID;
 
+import javax.annotation.Nullable;
 import java.util.HashMap;
 import java.util.Map;
 
-import static org.apache.flink.metrics.groups.JobMetricGroup.DEFAULT_SCOPE_JOB;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -35,47 +36,109 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 @Internal
 public class TaskMetricGroup extends ComponentMetricGroup {
-	
-	public static final String SCOPE_TASK_DESCRIPTOR = "task";
-	public static final String SCOPE_TASK_ID = Scope.format("task_id");
-	public static final String SCOPE_TASK_NAME = Scope.format("task_name");
-	public static final String SCOPE_TASK_ATTEMPT = Scope.format("task_attempt");
-	public static final String SCOPE_TASK_SUBTASK_INDEX = Scope.format("subtask_index");
-	public static final String DEFAULT_SCOPE_TASK_COMPONENT = SCOPE_TASK_NAME;
-	public static final String DEFAULT_SCOPE_TASK = Scope.concat(DEFAULT_SCOPE_JOB, DEFAULT_SCOPE_TASK_COMPONENT);
 
+	/** The job metrics group containing this task metrics group */
+	private final JobMetricGroup parent;
 
 	private final Map<String, OperatorMetricGroup> operators = new HashMap<>();
 
 	private final IOMetricGroup ioMetrics;
-
+	
+	/** The execution Id uniquely identifying the executed task represented by this metrics group */
 	private final AbstractID executionId;
+
+	@Nullable
+	private final AbstractID vertexId;
 	
+	@Nullable
+	private final String taskName;
+
 	private final int subtaskIndex;
 
-	protected TaskMetricGroup(
+	private final int attemptNumber;
+
+	// ------------------------------------------------------------------------
+
+	public TaskMetricGroup(
 			MetricRegistry registry,
 			JobMetricGroup parent,
-			AbstractID taskId,
+			@Nullable AbstractID vertexId,
 			AbstractID executionId,
+			@Nullable String taskName,
 			int subtaskIndex,
-			String name) {
+			int attemptNumber) {
+		
+		this(registry, parent, registry.getScopeFormats().getTaskFormat(),
+				vertexId, executionId, taskName, subtaskIndex, attemptNumber);
+	}
 
-		super(registry, parent, registry.getScopeConfig().getTaskFormat());
+	public TaskMetricGroup(
+			MetricRegistry registry,
+			JobMetricGroup parent,
+			TaskScopeFormat scopeFormat, 
+			@Nullable AbstractID vertexId,
+			AbstractID executionId,
+			@Nullable String taskName,
+			int subtaskIndex,
+			int attemptNumber) {
 
-		this.executionId = executionId;
+		super(registry, scopeFormat.formatScope(
+				parent, vertexId, executionId, taskName, subtaskIndex, attemptNumber));
+
+		this.parent = checkNotNull(parent);
+		this.executionId = checkNotNull(executionId);
+		this.vertexId = vertexId;
+		this.taskName = taskName;
 		this.subtaskIndex = subtaskIndex;
-		
-		this.formats.put(SCOPE_TASK_ID, taskId.toString());
-		this.formats.put(SCOPE_TASK_ATTEMPT, executionId.toString());
-		this.formats.put(SCOPE_TASK_NAME, checkNotNull(name));
-		this.formats.put(SCOPE_TASK_SUBTASK_INDEX, String.valueOf(subtaskIndex));
+		this.attemptNumber = attemptNumber;
 
 		this.ioMetrics = new IOMetricGroup(registry, this);
 	}
 
+	// ------------------------------------------------------------------------
+	//  properties
+	// ------------------------------------------------------------------------
+
+	public final JobMetricGroup parent() {
+		return parent;
+	}
+
+	public AbstractID executionId() {
+		return executionId;
+	}
+
+	@Nullable
+	public AbstractID vertexId() {
+		return vertexId;
+	}
+
+	@Nullable
+	public String taskName() {
+		return taskName;
+	}
+
+	public int subtaskIndex() {
+		return subtaskIndex;
+	}
+
+	public int attemptNumber() {
+		return attemptNumber;
+	}
+
+	/**
+	 * Returns the IOMetricGroup for this task.
+	 *
+	 * @return IOMetricGroup for this task.
+	 */
+	public IOMetricGroup getIOMetricGroup() {
+		return ioMetrics;
+	}
+
+	// ------------------------------------------------------------------------
+	//  operators and cleanup
+	// ------------------------------------------------------------------------
 	public OperatorMetricGroup addOperator(String name) {
-		OperatorMetricGroup operator = new OperatorMetricGroup(this.registry, this, name, this.subtaskIndex);
+		OperatorMetricGroup operator = new OperatorMetricGroup(this.registry, this, name);
 
 		synchronized (this) {
 			OperatorMetricGroup previous = operators.put(name, operator);
@@ -93,27 +156,10 @@ public class TaskMetricGroup extends ComponentMetricGroup {
 	@Override
 	public void close() {
 		super.close();
-		parent().removeTaskMetricGroup(executionId);
-	}
-
-	/**
-	 * Returns the IOMetricGroup for this task.
-	 *
-	 * @return IOMetricGroup for this task.
-	 */
-	public IOMetricGroup getIOMetricGroup() {
-		return this.ioMetrics;
-	}
-
-	@Override
-	protected JobMetricGroup parent() {
-		return (JobMetricGroup) super.parent();
+		parent.removeTaskMetricGroup(executionId);
 	}
 
-	@Override
-	protected String getScopeFormat(Scope.ScopeFormat format) {
-		return format.getTaskFormat();
-	}
+	// ------------------------------------------------------------------------
 
 	@Override
 	protected Iterable<? extends ComponentMetricGroup> subComponents() {

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
new file mode 100644
index 0000000..961bcce
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.groups;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+
+/**
+ * A special {@link MetricGroup} that does not register any metrics at the metrics registry
+ * and any reporters.
+ * 
+ * <p>This metrics group appears always closed ({@link #isClosed()}).
+ */
+@Internal
+public class UnregisteredMetricsGroup implements MetricGroup {
+
+	@Override
+	public void close() {}
+
+	@Override
+	public boolean isClosed() {
+		return true;
+	}
+
+	@Override
+	public Counter counter(int name) {
+		return new Counter();
+	}
+
+	@Override
+	public Counter counter(String name) {
+		return new Counter();
+	}
+
+	@Override
+	public <T> Gauge<T> gauge(int name, Gauge<T> gauge) {
+		return gauge;
+	}
+
+	@Override
+	public <T> Gauge<T> gauge(String name, Gauge<T> gauge) {
+		return gauge;
+	}
+
+	
+	@Override
+	public MetricGroup addGroup(int name) {
+		return addGroup(String.valueOf(name));
+	}
+
+	@Override
+	public MetricGroup addGroup(String name) {
+		return new UnregisteredMetricsGroup();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormat.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormat.java
new file mode 100644
index 0000000..9637f65
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormat.java
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.groups.scope;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.metrics.groups.JobMetricGroup;
+import org.apache.flink.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.metrics.groups.TaskMetricGroup;
+import org.apache.flink.util.AbstractID;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class represents the format after which the "scope" (or namespace) of the various
+ * component metric groups is built. Component metric groups
+ * ({@link org.apache.flink.metrics.groups.ComponentMetricGroup}), are for example
+ * "TaskManager", "Task", or "Operator".
+ *
+ * <p>User defined scope formats allow users to include or exclude
+ * certain identifiers from the scope. The scope for metrics belonging to the "Task"
+ * group could for example include the task attempt number (more fine grained identification), or
+ * exclude it (continuity of the namespace across failure and recovery).
+ */
+public abstract class ScopeFormat {
+
+	// ------------------------------------------------------------------------
+	//  Scope Format Special Characters
+	// ------------------------------------------------------------------------
+
+	/**
+	 * If the scope format starts with this character, then the parent components scope
+	 * format will be used as a prefix.
+	 * 
+	 * <p>For example, if the {@link JobMetricGroup} format is {@code "*.<job_name>"}, and the
+	 * {@link TaskManagerMetricGroup} format is {@code "<host>"}, then the job's metrics
+	 * will have {@code "<host>.<job_name>"} as their scope.
+	 */
+	public static final String SCOPE_INHERIT_PARENT = "*";
+
+	public static final String SCOPE_SEPARATOR = ".";
+
+	private static final String SCOPE_VARIABLE_PREFIX = "<";
+	private static final String SCOPE_VARIABLE_SUFFIX = ">";
+
+	// ------------------------------------------------------------------------
+	//  Scope Variables
+	// ------------------------------------------------------------------------
+
+	// ----- Task Manager ----
+
+	public static final String SCOPE_TASKMANAGER_HOST = asVariable("host");
+	public static final String SCOPE_TASKMANAGER_ID = asVariable("tm_id");
+
+	/** The default scope format of the TaskManager component: {@code "<host>.taskmanager.<tm_id>"} */
+	public static final String DEFAULT_SCOPE_TASKMANAGER_COMPONENT =
+			concat(SCOPE_TASKMANAGER_HOST, "taskmanager", SCOPE_TASKMANAGER_ID);
+
+	/** The default scope format of TaskManager metrics: {@code "<host>.taskmanager.<tm_id>"} */
+	public static final String DEFAULT_SCOPE_TASKMANAGER_GROUP = DEFAULT_SCOPE_TASKMANAGER_COMPONENT;
+
+	// ----- Job on Task Manager ----
+
+	public static final String SCOPE_JOB_ID = asVariable("job_id");
+	public static final String SCOPE_JOB_NAME = asVariable("job_name");
+
+	/** The default scope format for the job component: {@code "<job_name>"} */
+	public static final String DEFAULT_SCOPE_TASKMANAGER_JOB_COMPONENT = SCOPE_JOB_NAME;
+
+	/** The default scope format for all job metrics: {@code "<host>.taskmanager.<tm_id>.<job_name>"} */
+	public static final String DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP =
+			concat(DEFAULT_SCOPE_TASKMANAGER_COMPONENT, DEFAULT_SCOPE_TASKMANAGER_JOB_COMPONENT);
+
+	// ----- Task ----
+
+	public static final String SCOPE_TASK_VERTEX_ID = asVariable("task_id");
+	public static final String SCOPE_TASK_NAME = asVariable("task_name");
+	public static final String SCOPE_TASK_ATTEMPT_ID = asVariable("task_attempt_id");
+	public static final String SCOPE_TASK_ATTEMPT_NUM = asVariable("task_attempt_num");
+	public static final String SCOPE_TASK_SUBTASK_INDEX = asVariable("subtask_index");
+
+	/** Default scope of the task component: {@code "<task_name>.<subtask_index>"} */
+	public static final String DEFAULT_SCOPE_TASK_COMPONENT =
+			concat(SCOPE_TASK_NAME, SCOPE_TASK_SUBTASK_INDEX);
+
+	/** The default scope format for all task metrics:
+	 * {@code "<host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>"} */
+	public static final String DEFAULT_SCOPE_TASK_GROUP =
+			concat(DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP, DEFAULT_SCOPE_TASK_COMPONENT);
+
+	// ----- Operator ----
+
+	public static final String SCOPE_OPERATOR_NAME = asVariable("operator_name");
+
+	/** The default scope added by the operator component: "<operator_name>.<subtask_index>" */
+	public static final String DEFAULT_SCOPE_OPERATOR_COMPONENT =
+			concat(SCOPE_OPERATOR_NAME, SCOPE_TASK_SUBTASK_INDEX);
+
+	/** The default scope format for all operator metrics:
+	 * {@code "<host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>"} */
+	public static final String DEFAULT_SCOPE_OPERATOR_GROUP =
+			concat(DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP, DEFAULT_SCOPE_OPERATOR_COMPONENT);
+
+	// ------------------------------------------------------------------------
+	//  Formatters form the individual component types
+	// ------------------------------------------------------------------------
+
+	/**
+	 * The scope format for the {@link TaskManagerMetricGroup}.
+	 */
+	public static class TaskManagerScopeFormat extends ScopeFormat {
+
+		public TaskManagerScopeFormat(String format) {
+			super(format, null, new String[] {
+					SCOPE_TASKMANAGER_HOST,
+					SCOPE_TASKMANAGER_ID
+			});
+		}
+
+		public String[] formatScope(String hostname, String taskManagerId) {
+			final String[] template = copyTemplate();
+			final String[] values = { hostname, taskManagerId };
+			return bindVariables(template, values);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * The scope format for the {@link JobMetricGroup}.
+	 */
+	public static class TaskManagerJobScopeFormat extends ScopeFormat {
+
+		public TaskManagerJobScopeFormat(String format, TaskManagerScopeFormat parentFormat) {
+			super(format, parentFormat, new String[] {
+					SCOPE_TASKMANAGER_HOST,
+					SCOPE_TASKMANAGER_ID,
+					SCOPE_JOB_ID,
+					SCOPE_JOB_NAME
+			});
+		}
+
+		public String[] formatScope(TaskManagerMetricGroup parent, JobID jid, String jobName) {
+			final String[] template = copyTemplate();
+			final String[] values = {
+					parent.hostname(),
+					parent.taskManagerId(),
+					valueOrNull(jid),
+					valueOrNull(jobName)
+			};
+			return bindVariables(template, values);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * The scope format for the {@link TaskMetricGroup}.
+	 */
+	public static class TaskScopeFormat extends ScopeFormat {
+
+		public TaskScopeFormat(String format, TaskManagerJobScopeFormat parentFormat) {
+			super(format, parentFormat, new String[] {
+					SCOPE_TASKMANAGER_HOST,
+					SCOPE_TASKMANAGER_ID,
+					SCOPE_JOB_ID,
+					SCOPE_JOB_NAME,
+					SCOPE_TASK_VERTEX_ID,
+					SCOPE_TASK_ATTEMPT_ID,
+					SCOPE_TASK_NAME,
+					SCOPE_TASK_SUBTASK_INDEX,
+					SCOPE_TASK_ATTEMPT_NUM
+			});
+		}
+
+		public String[] formatScope(
+				JobMetricGroup parent,
+				AbstractID vertexId, AbstractID attemptId,
+				String taskName, int subtask, int attemptNumber) {
+
+			final String[] template = copyTemplate();
+			final String[] values = {
+					parent.parent().hostname(),
+					parent.parent().taskManagerId(),
+					valueOrNull(parent.jobId()),
+					valueOrNull(parent.jobName()),
+					valueOrNull(vertexId),
+					valueOrNull(attemptId),
+					valueOrNull(taskName),
+					String.valueOf(subtask),
+					String.valueOf(attemptNumber)
+			};
+			return bindVariables(template, values);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * The scope format for the {@link org.apache.flink.metrics.groups.OperatorMetricGroup}.
+	 */
+	public static class OperatorScopeFormat extends ScopeFormat {
+
+		public OperatorScopeFormat(String format, TaskScopeFormat parentFormat) {
+			super(format, parentFormat, new String[] {
+					SCOPE_TASKMANAGER_HOST,
+					SCOPE_TASKMANAGER_ID,
+					SCOPE_JOB_ID,
+					SCOPE_JOB_NAME,
+					SCOPE_TASK_VERTEX_ID,
+					SCOPE_TASK_ATTEMPT_ID,
+					SCOPE_TASK_NAME,
+					SCOPE_TASK_SUBTASK_INDEX,
+					SCOPE_TASK_ATTEMPT_NUM,
+					SCOPE_OPERATOR_NAME
+			});
+		}
+
+		public String[] formatScope(TaskMetricGroup parent, String operatorName) {
+
+			final String[] template = copyTemplate();
+			final String[] values = {
+					parent.parent().parent().hostname(),
+					parent.parent().parent().taskManagerId(),
+					valueOrNull(parent.parent().jobId()),
+					valueOrNull(parent.parent().jobName()),
+					valueOrNull(parent.vertexId()),
+					valueOrNull(parent.executionId()),
+					valueOrNull(parent.taskName()),
+					String.valueOf(parent.subtaskIndex()),
+					String.valueOf(parent.attemptNumber()),
+					valueOrNull(operatorName)
+			};
+			return bindVariables(template, values);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Scope Format Base
+	// ------------------------------------------------------------------------
+
+	/** The scope format */
+	private final String format;
+
+	/** The format, split into components */
+	private final String[] template;
+
+	private final int[] templatePos;
+
+	private final int[] valuePos;
+
+	// ------------------------------------------------------------------------
+
+	protected ScopeFormat(String format, ScopeFormat parent, String[] variables) {
+		checkNotNull(format, "format is null");
+
+		final String[] rawComponents = format.split("\\" + SCOPE_SEPARATOR);
+
+		// compute the template array
+		final boolean parentAsPrefix = rawComponents.length > 0 && rawComponents[0].equals(SCOPE_INHERIT_PARENT);
+		if (parentAsPrefix) {
+			if (parent == null) {
+				throw new IllegalArgumentException("Component scope format requires parent prefix (starts with '"
+					+ SCOPE_INHERIT_PARENT + "'), but this component has no parent (is root component).");
+			}
+
+			this.format = format.length() > 2 ? format.substring(2) : "<empty>";
+
+			String[] parentTemplate = parent.template;
+			int parentLen = parentTemplate.length;
+			
+			this.template = new String[parentLen + rawComponents.length - 1];
+			System.arraycopy(parentTemplate, 0, this.template, 0, parentLen);
+			System.arraycopy(rawComponents, 1, this.template, parentLen, rawComponents.length - 1);
+		}
+		else {
+			this.format = format.isEmpty() ? "<empty>" : format;
+			this.template = rawComponents;
+		}
+
+		// --- compute the replacement matrix ---
+		// a bit of clumsy Java collections code ;-)
+		
+		HashMap<String, Integer> varToValuePos = arrayToMap(variables);
+		List<Integer> templatePos = new ArrayList<>();
+		List<Integer> valuePos = new ArrayList<>();
+
+		for (int i = 0; i < template.length; i++) {
+			final String component = template[i];
+			
+			// check if that is a variable
+			if (component != null && component.length() >= 3 &&
+					component.charAt(0) == '<' && component.charAt(component.length() - 1) == '>') {
+
+				// this is a variable
+				Integer replacementPos = varToValuePos.get(component);
+				if (replacementPos != null) {
+					templatePos.add(i);
+					valuePos.add(replacementPos);
+				}
+			}
+		}
+
+		this.templatePos = integerListToArray(templatePos);
+		this.valuePos = integerListToArray(valuePos);
+	}
+
+	// ------------------------------------------------------------------------
+
+	public String format() {
+		return format;
+	}
+
+	protected final String[] copyTemplate() {
+		String[] copy = new String[template.length];
+		System.arraycopy(template, 0, copy, 0, template.length);
+		return copy;
+	}
+
+	protected final String[] bindVariables(String[] template, String[] values) {
+		final int len = templatePos.length;
+		for (int i = 0; i < len; i++) {
+			template[templatePos[i]] = values[valuePos[i]];
+		}
+		return template;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public String toString() {
+		return "ScopeFormat '" + format + '\'';
+	}
+	
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Formats the given string to resemble a scope variable.
+	 *
+	 * @param scope The string to format
+	 * @return The formatted string
+	 */
+	public static String asVariable(String scope) {
+		return SCOPE_VARIABLE_PREFIX + scope + SCOPE_VARIABLE_SUFFIX;
+	}
+
+	public static String concat(String... components) {
+		StringBuilder sb = new StringBuilder();
+		sb.append(components[0]);
+		for (int x = 1; x < components.length; x++) {
+			sb.append(SCOPE_SEPARATOR);
+			sb.append(components[x]);
+		}
+		return sb.toString();
+	}
+	
+	static String valueOrNull(Object value) {
+		return (value == null || (value instanceof String && ((String) value).isEmpty())) ?
+				"null" : value.toString();
+	}
+
+	static HashMap<String, Integer> arrayToMap(String[] array) {
+		HashMap<String, Integer> map = new HashMap<>(array.length);
+		for (int i = 0; i < array.length; i++) {
+			map.put(array[i], i);
+		}
+		return map;
+	}
+
+	private static int[] integerListToArray(List<Integer> list) {
+		int[] array = new int[list.size()];
+		int pos = 0;
+		for (Integer i : list) {
+			array[pos++] = i;
+		}
+		return array;
+	}
+}


Mime
View raw message