flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [09/13] flink git commit: [FLINK-4769] [metrics] Port metric config parameters to ConfigOptions
Date Fri, 21 Apr 2017 12:24:23 GMT
[FLINK-4769] [metrics] Port metric config parameters to ConfigOptions

This closes #3687


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1b77cc3f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1b77cc3f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1b77cc3f

Branch: refs/heads/master
Commit: 1b77cc3f2682afd839ce6b16cd1658bbd01a6b04
Parents: c0ea74a
Author: zentol <chesnay@apache.org>
Authored: Thu Apr 6 13:47:33 2017 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Fri Apr 21 12:31:30 2017 +0200

----------------------------------------------------------------------
 .../connectors/kafka/KafkaTestBase.java         |  3 +-
 .../flink/configuration/ConfigConstants.java    | 44 +++++-----
 .../flink/configuration/MetricOptions.java      | 88 ++++++++++++++++++++
 .../ScheduledDropwizardReporterTest.java        |  7 +-
 .../DropwizardFlinkHistogramWrapperTest.java    |  3 +-
 .../flink/metrics/jmx/JMXReporterTest.java      |  9 +-
 .../jobmanager/JMXJobManagerMetricTest.java     |  5 +-
 .../metrics/statsd/StatsDReporterTest.java      | 11 +--
 .../metrics/MetricRegistryConfiguration.java    | 24 ++----
 .../runtime/metrics/scope/ScopeFormat.java      | 49 -----------
 .../runtime/metrics/scope/ScopeFormats.java     | 32 +++----
 .../runtime/metrics/MetricRegistryTest.java     | 29 +++----
 .../metrics/groups/AbstractMetricGroupTest.java |  7 +-
 .../metrics/groups/JobManagerGroupTest.java     |  4 +-
 .../metrics/groups/JobManagerJobGroupTest.java  | 10 +--
 .../groups/MetricGroupRegistrationTest.java     |  3 +-
 .../metrics/groups/TaskManagerGroupTest.java    |  4 +-
 .../metrics/groups/TaskManagerJobGroupTest.java | 10 +--
 .../metrics/groups/TaskMetricGroupTest.java     | 10 +--
 .../api/operators/AbstractStreamOperator.java   |  8 +-
 20 files changed, 196 insertions(+), 164 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index 0c6bfa9..a21a239 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kafka;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.metrics.jmx.JMXReporter;
 import org.apache.flink.runtime.client.JobExecutionException;
@@ -116,7 +117,7 @@ public abstract class KafkaTestBase extends TestLogger {
 		flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
 		flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
 		flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
-		flinkConfig.setString(ConfigConstants.METRICS_REPORTERS_LIST, "my_reporter");
+		flinkConfig.setString(MetricOptions.REPORTERS_LIST, "my_reporter");
 		flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
 		return flinkConfig;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index a035beb..61c1b27 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -990,21 +990,8 @@ public final class ConfigConstants {
 
 	// ---------------------------- Metrics -----------------------------------
 
-	/**
-	 * The list of named reporters. Names are defined here and per-reporter configs
-	 * are given with the reporter config prefix and the reporter name.
-	 *
-	 * Example:
-	 * <pre>{@code
-	 * metrics.reporters = foo, bar
-	 *
-	 * metrics.reporter.foo.class = org.apache.flink.metrics.reporter.JMXReporter
-	 * metrics.reporter.foo.interval = 10
-	 *
-	 * metrics.reporter.bar.class = org.apache.flink.metrics.graphite.GraphiteReporter
-	 * metrics.reporter.bar.port = 1337
-	 * }</pre>
-	 */
+	/** @deprecated Use {@link MetricOptions#REPORTERS_LIST} instead. */
+	@Deprecated
 	public static final String METRICS_REPORTERS_LIST = "metrics.reporters";
 
 	/**
@@ -1022,28 +1009,36 @@ public final class ConfigConstants {
 	/**	The delimiter used to assemble the metric identifier. This is used as a suffix in an actual reporter config. */
 	public static final String METRICS_REPORTER_SCOPE_DELIMITER = "scope.delimiter";
 
-	/** The delimiter used to assemble the metric identifier. */
+	/** @deprecated Use {@link MetricOptions#SCOPE_DELIMITER} instead. */
+	@Deprecated
 	public static final String METRICS_SCOPE_DELIMITER = "metrics.scope.delimiter";
 
-	/** The scope format string that is applied to all metrics scoped to a JobManager. */
+	/** @deprecated Use {@link MetricOptions#SCOPE_NAMING_JM} instead. */
+	@Deprecated
 	public static final String METRICS_SCOPE_NAMING_JM = "metrics.scope.jm";
 
-	/** The scope format string that is applied to all metrics scoped to a TaskManager. */
+	/** @deprecated Use {@link MetricOptions#SCOPE_NAMING_TM} instead. */
+	@Deprecated
 	public static final String METRICS_SCOPE_NAMING_TM = "metrics.scope.tm";
 
-	/** The scope format string that is applied to all metrics scoped to a job on a JobManager. */
+	/** @deprecated Use {@link MetricOptions#SCOPE_NAMING_JM_JOB} instead. */
+	@Deprecated
 	public static final String METRICS_SCOPE_NAMING_JM_JOB = "metrics.scope.jm.job";
 
-	/** The scope format string that is applied to all metrics scoped to a job on a TaskManager. */
+	/** @deprecated Use {@link MetricOptions#SCOPE_NAMING_TM_JOB} instead. */
+	@Deprecated
 	public static final String METRICS_SCOPE_NAMING_TM_JOB = "metrics.scope.tm.job";
 
-	/** The scope format string that is applied to all metrics scoped to a task. */
+	/** @deprecated Use {@link MetricOptions#SCOPE_NAMING_TASK} instead. */
+	@Deprecated
 	public static final String METRICS_SCOPE_NAMING_TASK = "metrics.scope.task";
 
-	/** The scope format string that is applied to all metrics scoped to an operator. */
+	/** @deprecated Use {@link MetricOptions#SCOPE_NAMING_OPERATOR} instead. */
+	@Deprecated
 	public static final String METRICS_SCOPE_NAMING_OPERATOR = "metrics.scope.operator";
 
-	/** The number of measured latencies to maintain at each operator */
+	/** @deprecated Use {@link MetricOptions#LATENCY_HISTORY_SIZE} instead. */
+	@Deprecated
 	public static final String METRICS_LATENCY_HISTORY_SIZE = "metrics.latency.history-size";
 
 
@@ -1498,7 +1493,8 @@ public final class ConfigConstants {
 
 	// ----------------------------- Metrics ----------------------------
 
-	/** The default number of measured latencies to maintain at each operator */
+	/** @deprecated Use {@link MetricOptions#LATENCY_HISTORY_SIZE} instead. */
+	@Deprecated
 	public static final int DEFAULT_METRICS_LATENCY_HISTORY_SIZE = 128;
 
 	// ----------------------------- Environment Variables ----------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
new file mode 100644
index 0000000..8a328ac
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+@PublicEvolving
+public class MetricOptions {
+
+	/**
+	 * The list of named reporters. Names are defined here and per-reporter configs
+	 * are given with the reporter config prefix and the reporter name.
+	 *
+	 * Example:
+	 * <pre>{@code
+	 * metrics.reporters = foo, bar
+	 *
+	 * metrics.reporter.foo.class = org.apache.flink.metrics.reporter.JMXReporter
+	 * metrics.reporter.foo.interval = 10
+	 *
+	 * metrics.reporter.bar.class = org.apache.flink.metrics.graphite.GraphiteReporter
+	 * metrics.reporter.bar.port = 1337
+	 * }</pre>
+	 */
+	public static final ConfigOption<String> REPORTERS_LIST =
+		key("metrics.reporters")
+			.noDefaultValue();
+
+	/** The delimiter used to assemble the metric identifier. */
+	public static final ConfigOption<String> SCOPE_DELIMITER =
+		key("metrics.scope.delimiter")
+			.defaultValue(".");
+
+	/** The scope format string that is applied to all metrics scoped to a JobManager. */
+	public static final ConfigOption<String> SCOPE_NAMING_JM =
+		key("metrics.scope.jm")
+			.defaultValue("<host>.jobmanager");
+
+	/** The scope format string that is applied to all metrics scoped to a TaskManager. */
+	public static final ConfigOption<String> SCOPE_NAMING_TM =
+		key("metrics.scope.tm")
+			.defaultValue("<host>.taskmanager.<tm_id>");
+
+	/** The scope format string that is applied to all metrics scoped to a job on a JobManager. */
+	public static final ConfigOption<String> SCOPE_NAMING_JM_JOB =
+		key("metrics.scope.jm.job")
+			.defaultValue("<host>.jobmanager.<job_name>");
+
+	/** The scope format string that is applied to all metrics scoped to a job on a TaskManager. */
+	public static final ConfigOption<String> SCOPE_NAMING_TM_JOB =
+		key("metrics.scope.tm.job")
+			.defaultValue("<host>.taskmanager.<tm_id>.<job_name>");
+
+	/** The scope format string that is applied to all metrics scoped to a task. */
+	public static final ConfigOption<String> SCOPE_NAMING_TASK =
+		key("metrics.scope.task")
+			.defaultValue("<host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>");
+
+	/** The scope format string that is applied to all metrics scoped to an operator. */
+	public static final ConfigOption<String> SCOPE_NAMING_OPERATOR =
+		key("metrics.scope.operator")
+			.defaultValue("<host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>");
+
+	/** The number of measured latencies to maintain at each operator */
+	public static final ConfigOption<Integer> LATENCY_HISTORY_SIZE =
+		key("metrics.latency.history-size")
+			.defaultValue(128);
+
+	private MetricOptions() {
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
index 87c4ccb..73e7f0b 100644
--- a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
+++ b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
@@ -22,6 +22,7 @@ import com.codahale.metrics.ScheduledReporter;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.dropwizard.metrics.DropwizardMeterWrapper;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
@@ -76,13 +77,13 @@ public class ScheduledDropwizardReporterTest {
 		String taskManagerId = "tas:kMana::ger";
 		String counterName = "testCounter";
 
-		configuration.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test");
+		configuration.setString(MetricOptions.REPORTERS_LIST, "test");
 		configuration.setString(
 				ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX,
 				"org.apache.flink.dropwizard.ScheduledDropwizardReporterTest$TestingScheduledDropwizardReporter");
 
-		configuration.setString(ConfigConstants.METRICS_SCOPE_NAMING_TASK, "<host>.<tm_id>.<job_name>");
-		configuration.setString(ConfigConstants.METRICS_SCOPE_DELIMITER, "_");
+		configuration.setString(MetricOptions.SCOPE_NAMING_TASK, "<host>.<tm_id>.<job_name>");
+		configuration.setString(MetricOptions.SCOPE_DELIMITER, "_");
 
 		MetricRegistryConfiguration metricRegistryConfiguration = MetricRegistryConfiguration.fromConfiguration(configuration);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
index 2f2a536..8f7796c 100644
--- a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
+++ b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
@@ -28,6 +28,7 @@ import com.codahale.metrics.Snapshot;
 import com.codahale.metrics.Timer;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.dropwizard.ScheduledDropwizardReporter;
 import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.reporter.MetricReporter;
@@ -96,7 +97,7 @@ public class DropwizardFlinkHistogramWrapperTest extends TestLogger {
 		int size = 10;
 		String histogramMetricName = "histogram";
 		Configuration config = new Configuration();
-		config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "my_reporter");
+		config.setString(MetricOptions.REPORTERS_LIST, "my_reporter");
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestingReporter.class.getName());
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, reportingInterval + " MILLISECONDS");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
index 1a96287..85ab897 100644
--- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
+++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.metrics.jmx;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.util.TestMeter;
 import org.apache.flink.metrics.reporter.MetricReporter;
@@ -94,7 +95,7 @@ public class JMXReporterTest extends TestLogger {
 	@Test
 	public void testPortConflictHandling() throws Exception {
 		Configuration cfg = new Configuration();
-		cfg.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test1,test2");
+		cfg.setString(MetricOptions.REPORTERS_LIST, "test1,test2");
 
 		cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
 		cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1.port", "9020-9035");
@@ -154,7 +155,7 @@ public class JMXReporterTest extends TestLogger {
 		Configuration cfg = new Configuration();
 		cfg.setString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter.class.getName());
 
-		cfg.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test1,test2");
+		cfg.setString(MetricOptions.REPORTERS_LIST, "test1,test2");
 
 		cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
 		cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1.port", "9040-9055");
@@ -230,7 +231,7 @@ public class JMXReporterTest extends TestLogger {
 
 		try {
 			Configuration config = new Configuration();
-			config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "jmx_test");
+			config.setString(MetricOptions.REPORTERS_LIST, "jmx_test");
 			config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "jmx_test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
 
 			registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
@@ -280,7 +281,7 @@ public class JMXReporterTest extends TestLogger {
 
 		try {
 			Configuration config = new Configuration();
-			config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "jmx_test");
+			config.setString(MetricOptions.REPORTERS_LIST, "jmx_test");
 			config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "jmx_test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
 
 			registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));

http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
index 934a621..6d54db3 100644
--- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
+++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.runtime.jobmanager;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.metrics.jmx.JMXReporter;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -55,11 +56,11 @@ public class JMXJobManagerMetricTest {
 		Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow();
 		Configuration flinkConfiguration = new Configuration();
 
-		flinkConfiguration.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test");
+		flinkConfiguration.setString(MetricOptions.REPORTERS_LIST, "test");
 		flinkConfiguration.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
 		flinkConfiguration.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.port", "9060-9075");
 
-		flinkConfiguration.setString(ConfigConstants.METRICS_SCOPE_NAMING_JM_JOB, "jobmanager.<job_name>");
+		flinkConfiguration.setString(MetricOptions.SCOPE_NAMING_JM_JOB, "jobmanager.<job_name>");
 
 		TestingCluster flink = new TestingCluster(flinkConfiguration);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
index 17fd65a..9215c3f 100644
--- a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
+++ b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.metrics.statsd;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.HistogramStatistics;
@@ -75,13 +76,13 @@ public class StatsDReporterTest extends TestLogger {
 		String taskManagerId = "tas:kMana::ger";
 		String counterName = "testCounter";
 
-		configuration.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test");
+		configuration.setString(MetricOptions.REPORTERS_LIST, "test");
 		configuration.setString(
 				ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX,
 				"org.apache.flink.metrics.statsd.StatsDReporterTest$TestingStatsDReporter");
 
-		configuration.setString(ConfigConstants.METRICS_SCOPE_NAMING_TASK, "<host>.<tm_id>.<job_name>");
-		configuration.setString(ConfigConstants.METRICS_SCOPE_DELIMITER, "_");
+		configuration.setString(MetricOptions.SCOPE_NAMING_TASK, "<host>.<tm_id>.<job_name>");
+		configuration.setString(MetricOptions.SCOPE_DELIMITER, "_");
 
 		MetricRegistry metricRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(configuration));
 
@@ -145,7 +146,7 @@ public class StatsDReporterTest extends TestLogger {
 			int port = receiver.getPort();
 
 			Configuration config = new Configuration();
-			config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test");
+			config.setString(MetricOptions.REPORTERS_LIST, "test");
 			config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, StatsDReporter.class.getName());
 			config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "1 SECONDS");
 			config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.host", "localhost");
@@ -219,7 +220,7 @@ public class StatsDReporterTest extends TestLogger {
 			int port = receiver.getPort();
 
 			Configuration config = new Configuration();
-			config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test");
+			config.setString(MetricOptions.REPORTERS_LIST, "test");
 			config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, StatsDReporter.class.getName());
 			config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "1 SECONDS");
 			config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.host", "localhost");

http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
index 596fc82..3475f04 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DelegatingConfiguration;
-import org.apache.flink.runtime.metrics.scope.ScopeFormat;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.runtime.metrics.scope.ScopeFormats;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
@@ -101,13 +101,13 @@ public class MetricRegistryConfiguration {
 
 		char delim;
 		try {
-			delim = configuration.getString(ConfigConstants.METRICS_SCOPE_DELIMITER, ".").charAt(0);
+			delim = configuration.getString(MetricOptions.SCOPE_DELIMITER).charAt(0);
 		} catch (Exception e) {
 			LOG.warn("Failed to parse delimiter, using default delimiter.", e);
 			delim = '.';
 		}
 
-		final String definedReporters = configuration.getString(ConfigConstants.METRICS_REPORTERS_LIST, null);
+		final String definedReporters = configuration.getString(MetricOptions.REPORTERS_LIST);
 		List<Tuple2<String, Configuration>> reporterConfigurations;
 
 		if (definedReporters == null) {
@@ -136,18 +136,12 @@ public class MetricRegistryConfiguration {
 	 * @return Scope formats extracted from the given configuration
 	 */
 	static ScopeFormats createScopeConfig(Configuration configuration) {
-		String jmFormat = configuration.getString(
-			ConfigConstants.METRICS_SCOPE_NAMING_JM, ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_GROUP);
-		String jmJobFormat = configuration.getString(
-			ConfigConstants.METRICS_SCOPE_NAMING_JM_JOB, ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_JOB_GROUP);
-		String tmFormat = configuration.getString(
-			ConfigConstants.METRICS_SCOPE_NAMING_TM, ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_GROUP);
-		String tmJobFormat = configuration.getString(
-			ConfigConstants.METRICS_SCOPE_NAMING_TM_JOB, ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP);
-		String taskFormat = configuration.getString(
-			ConfigConstants.METRICS_SCOPE_NAMING_TASK, ScopeFormat.DEFAULT_SCOPE_TASK_GROUP);
-		String operatorFormat = configuration.getString(
-			ConfigConstants.METRICS_SCOPE_NAMING_OPERATOR, ScopeFormat.DEFAULT_SCOPE_OPERATOR_GROUP);
+		String jmFormat = configuration.getString(MetricOptions.SCOPE_NAMING_JM);
+		String jmJobFormat = configuration.getString(MetricOptions.SCOPE_NAMING_JM_JOB);
+		String tmFormat = configuration.getString(MetricOptions.SCOPE_NAMING_TM);
+		String tmJobFormat = configuration.getString(MetricOptions.SCOPE_NAMING_TM_JOB);
+		String taskFormat = configuration.getString(MetricOptions.SCOPE_NAMING_TASK);
+		String operatorFormat = configuration.getString(MetricOptions.SCOPE_NAMING_OPERATOR);
 
 		return new ScopeFormats(jmFormat, jmJobFormat, tmFormat, tmJobFormat, taskFormat, operatorFormat);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java
index adf0a85..f9efb88 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java
@@ -70,46 +70,15 @@ public abstract class ScopeFormat {
 
 	public static final String SCOPE_HOST = asVariable("host");
 
-	// ----- Job Manager ----
-
-	/** The default scope format of the JobManager component: {@code "<host>.jobmanager"} */
-	public static final String DEFAULT_SCOPE_JOBMANAGER_COMPONENT =
-		concat(SCOPE_HOST, "jobmanager");
-
-	/** The default scope format of JobManager metrics: {@code "<host>.jobmanager"} */
-	public static final String DEFAULT_SCOPE_JOBMANAGER_GROUP = DEFAULT_SCOPE_JOBMANAGER_COMPONENT;
-
 	// ----- Task Manager ----
 
 	public static final String SCOPE_TASKMANAGER_ID = asVariable("tm_id");
 
-	/** The default scope format of the TaskManager component: {@code "<host>.taskmanager.<tm_id>"} */
-	public static final String DEFAULT_SCOPE_TASKMANAGER_COMPONENT =
-			concat(SCOPE_HOST, "taskmanager", SCOPE_TASKMANAGER_ID);
-
-	/** The default scope format of TaskManager metrics: {@code "<host>.taskmanager.<tm_id>"} */
-	public static final String DEFAULT_SCOPE_TASKMANAGER_GROUP = DEFAULT_SCOPE_TASKMANAGER_COMPONENT;
-
 	// ----- Job -----
 
 	public static final String SCOPE_JOB_ID = asVariable("job_id");
 	public static final String SCOPE_JOB_NAME = asVariable("job_name");
 
-	/** The default scope format for the job component: {@code "<job_name>"} */
-	public static final String DEFAULT_SCOPE_JOB_COMPONENT = SCOPE_JOB_NAME;
-
-	// ----- Job on Job Manager ----
-
-	/** The default scope format for all job metrics on a jobmanager: {@code "<host>.jobmanager.<job_name>"} */
-	public static final String DEFAULT_SCOPE_JOBMANAGER_JOB_GROUP =
-		concat(DEFAULT_SCOPE_JOBMANAGER_COMPONENT, DEFAULT_SCOPE_JOB_COMPONENT);
-
-	// ----- Job on Task Manager ----
-
-	/** The default scope format for all job metrics on a taskmanager: {@code "<host>.taskmanager.<tm_id>.<job_name>"} */
-	public static final String DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP =
-			concat(DEFAULT_SCOPE_TASKMANAGER_COMPONENT, DEFAULT_SCOPE_JOB_COMPONENT);
-
 	// ----- Task ----
 
 	public static final String SCOPE_TASK_VERTEX_ID = asVariable("task_id");
@@ -118,27 +87,9 @@ public abstract class ScopeFormat {
 	public static final String SCOPE_TASK_ATTEMPT_NUM = asVariable("task_attempt_num");
 	public static final String SCOPE_TASK_SUBTASK_INDEX = asVariable("subtask_index");
 
-	/** Default scope of the task component: {@code "<task_name>.<subtask_index>"} */
-	public static final String DEFAULT_SCOPE_TASK_COMPONENT =
-			concat(SCOPE_TASK_NAME, SCOPE_TASK_SUBTASK_INDEX);
-
-	/** The default scope format for all task metrics:
-	 * {@code "<host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>"} */
-	public static final String DEFAULT_SCOPE_TASK_GROUP =
-			concat(DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP, DEFAULT_SCOPE_TASK_COMPONENT);
-
 	// ----- Operator ----
 
 	public static final String SCOPE_OPERATOR_NAME = asVariable("operator_name");
-
-	/** The default scope added by the operator component: "<operator_name>.<subtask_index>" */
-	public static final String DEFAULT_SCOPE_OPERATOR_COMPONENT =
-			concat(SCOPE_OPERATOR_NAME, SCOPE_TASK_SUBTASK_INDEX);
-
-	/** The default scope format for all operator metrics:
-	 * {@code "<host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>"} */
-	public static final String DEFAULT_SCOPE_OPERATOR_GROUP =
-			concat(DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP, DEFAULT_SCOPE_OPERATOR_COMPONENT);
 	
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormats.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormats.java
index bbbe6ba..bde93be 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormats.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormats.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.metrics.scope;
 
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -41,21 +41,21 @@ public class ScopeFormats {
 	 * Creates all default scope formats.
 	 */
 	public ScopeFormats() {
-		this.jobManagerFormat = new JobManagerScopeFormat(ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_COMPONENT);
+		this.jobManagerFormat = new JobManagerScopeFormat(MetricOptions.SCOPE_NAMING_JM.defaultValue());
 
 		this.jobManagerJobFormat = new JobManagerJobScopeFormat(
-				ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_JOB_GROUP, this.jobManagerFormat);
+			MetricOptions.SCOPE_NAMING_JM_JOB.defaultValue(), this.jobManagerFormat);
 
-		this.taskManagerFormat = new TaskManagerScopeFormat(ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_COMPONENT);
+		this.taskManagerFormat = new TaskManagerScopeFormat(MetricOptions.SCOPE_NAMING_TM.defaultValue());
 
 		this.taskManagerJobFormat = new TaskManagerJobScopeFormat(
-				ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP, this.taskManagerFormat);
+			MetricOptions.SCOPE_NAMING_TM_JOB.defaultValue(), this.taskManagerFormat);
 
 		this.taskFormat = new TaskScopeFormat(
-				ScopeFormat.DEFAULT_SCOPE_TASK_GROUP, this.taskManagerJobFormat);
+				MetricOptions.SCOPE_NAMING_TASK.defaultValue(), this.taskManagerJobFormat);
 
 		this.operatorFormat = new OperatorScopeFormat(
-				ScopeFormat.DEFAULT_SCOPE_OPERATOR_GROUP, this.taskFormat);
+			MetricOptions.SCOPE_NAMING_OPERATOR.defaultValue(), this.taskFormat);
 	}
 
 	/**
@@ -135,18 +135,12 @@ public class ScopeFormats {
 	 * @return The ScopeFormats parsed from the configuration
 	 */
 	public static ScopeFormats fromConfig(Configuration config) {
-		String jmFormat = config.getString(
-				ConfigConstants.METRICS_SCOPE_NAMING_JM, ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_GROUP);
-		String jmJobFormat = config.getString(
-				ConfigConstants.METRICS_SCOPE_NAMING_JM_JOB, ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_JOB_GROUP);
-		String tmFormat = config.getString(
-				ConfigConstants.METRICS_SCOPE_NAMING_TM, ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_GROUP);
-		String tmJobFormat = config.getString(
-				ConfigConstants.METRICS_SCOPE_NAMING_TM_JOB, ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP);
-		String taskFormat = config.getString(
-				ConfigConstants.METRICS_SCOPE_NAMING_TASK, ScopeFormat.DEFAULT_SCOPE_TASK_GROUP);
-		String operatorFormat = config.getString(
-				ConfigConstants.METRICS_SCOPE_NAMING_OPERATOR, ScopeFormat.DEFAULT_SCOPE_OPERATOR_GROUP);
+		String jmFormat = config.getString(MetricOptions.SCOPE_NAMING_JM);
+		String jmJobFormat = config.getString(MetricOptions.SCOPE_NAMING_JM_JOB);
+		String tmFormat = config.getString(MetricOptions.SCOPE_NAMING_TM);
+		String tmJobFormat = config.getString(MetricOptions.SCOPE_NAMING_TM_JOB);
+		String taskFormat = config.getString(MetricOptions.SCOPE_NAMING_TASK);
+		String operatorFormat = config.getString(MetricOptions.SCOPE_NAMING_OPERATOR);
 
 		return new ScopeFormats(jmFormat, jmJobFormat, tmFormat, tmJobFormat, taskFormat, operatorFormat);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
index fe29ccb..b9502b2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
@@ -23,6 +23,7 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricConfig;
@@ -69,7 +70,7 @@ public class MetricRegistryTest extends TestLogger {
 	public void testReporterInstantiation() {
 		Configuration config = new Configuration();
 
-		config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test");
+		config.setString(MetricOptions.REPORTERS_LIST, "test");
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName());
 
 		MetricRegistry metricRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
@@ -97,7 +98,7 @@ public class MetricRegistryTest extends TestLogger {
 	public void testMultipleReporterInstantiation() {
 		Configuration config = new Configuration();
 
-		config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test1, test2,test3");
+		config.setString(MetricOptions.REPORTERS_LIST, "test1, test2,test3");
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter11.class.getName());
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter12.class.getName());
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test3." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter13.class.getName());
@@ -147,7 +148,7 @@ public class MetricRegistryTest extends TestLogger {
 	public void testReporterArgumentForwarding() {
 		Configuration config = new Configuration();
 
-		config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test");
+		config.setString(MetricOptions.REPORTERS_LIST, "test");
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter2.class.getName());
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg1", "hello");
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg2", "world");
@@ -172,7 +173,7 @@ public class MetricRegistryTest extends TestLogger {
 	public void testReporterScheduling() throws InterruptedException {
 		Configuration config = new Configuration();
 
-		config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test");
+		config.setString(MetricOptions.REPORTERS_LIST, "test");
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter3.class.getName());
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg1", "hello");
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "50 MILLISECONDS");
@@ -215,7 +216,7 @@ public class MetricRegistryTest extends TestLogger {
 	@Test
 	public void testReporterNotifications() {
 		Configuration config = new Configuration();
-		config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test1,test2");
+		config.setString(MetricOptions.REPORTERS_LIST, "test1,test2");
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter6.class.getName());
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter7.class.getName());
 
@@ -278,10 +279,10 @@ public class MetricRegistryTest extends TestLogger {
 	public void testScopeConfig() {
 		Configuration config = new Configuration();
 
-		config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "A");
-		config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM_JOB, "B");
-		config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TASK, "C");
-		config.setString(ConfigConstants.METRICS_SCOPE_NAMING_OPERATOR, "D");
+		config.setString(MetricOptions.SCOPE_NAMING_TM, "A");
+		config.setString(MetricOptions.SCOPE_NAMING_TM_JOB, "B");
+		config.setString(MetricOptions.SCOPE_NAMING_TASK, "C");
+		config.setString(MetricOptions.SCOPE_NAMING_OPERATOR, "D");
 
 		ScopeFormats scopeConfig = MetricRegistryConfiguration.createScopeConfig(config);
 
@@ -294,8 +295,8 @@ public class MetricRegistryTest extends TestLogger {
 	@Test
 	public void testConfigurableDelimiter() {
 		Configuration config = new Configuration();
-		config.setString(ConfigConstants.METRICS_SCOPE_DELIMITER, "_");
-		config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "A.B.C.D.E");
+		config.setString(MetricOptions.SCOPE_DELIMITER, "_");
+		config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B.C.D.E");
 
 		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
 
@@ -308,7 +309,7 @@ public class MetricRegistryTest extends TestLogger {
 	@Test
 	public void testConfigurableDelimiterForReporters() {
 		Configuration config = new Configuration();
-		config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test1,test2,test3");
+		config.setString(MetricOptions.REPORTERS_LIST, "test1,test2,test3");
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "_");
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter.class.getName());
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-");
@@ -331,7 +332,7 @@ public class MetricRegistryTest extends TestLogger {
 	@Test
 	public void testConfigurableDelimiterForReportersInGroup() {
 		Configuration config = new Configuration();
-		config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test1,test2,test3,test4");
+		config.setString(MetricOptions.REPORTERS_LIST, "test1,test2,test3,test4");
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "_");
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter8.class.getName());
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-");
@@ -339,7 +340,7 @@ public class MetricRegistryTest extends TestLogger {
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test3." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "AA");
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test3." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter8.class.getName());
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test4." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter8.class.getName());
-		config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "A.B");
+		config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B");
 
 		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
 		List<MetricReporter> reporters = registry.getReporters();

http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
index da14bfd..04e40ae 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.metrics.CharacterFilter;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricGroup;
@@ -77,8 +78,8 @@ public class AbstractMetricGroupTest {
 	@Test
 	public void testScopeCachingForMultipleReporters() throws Exception {
 		Configuration config = new Configuration();
-		config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "A.B.C.D");
-		config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test1,test2");
+		config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B.C.D");
+		config.setString(MetricOptions.REPORTERS_LIST, "test1,test2");
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName());
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-");
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter2.class.getName());
@@ -166,7 +167,7 @@ public class AbstractMetricGroupTest {
 	@Test
 	public void testScopeGenerationWithoutReporters() {
 		Configuration config = new Configuration();
-		config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "A.B.C.D");
+		config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B.C.D");
 		MetricRegistry testRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
 
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
index 317d19e..7834755 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
@@ -18,8 +18,8 @@
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
@@ -110,7 +110,7 @@ public class JobManagerGroupTest extends TestLogger {
 	@Test
 	public void testGenerateScopeCustom() {
 		Configuration cfg = new Configuration();
-		cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_JM, "constant.<host>.foo.<host>");
+		cfg.setString(MetricOptions.SCOPE_NAMING_JM, "constant.<host>.foo.<host>");
 		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
 
 		JobManagerMetricGroup group = new JobManagerMetricGroup(registry, "host");

http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
index ed4056f..d8bd57a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
@@ -19,8 +19,8 @@
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
@@ -54,8 +54,8 @@ public class JobManagerJobGroupTest extends TestLogger {
 	@Test
 	public void testGenerateScopeCustom() {
 		Configuration cfg = new Configuration();
-		cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_JM, "abc");
-		cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_JM_JOB, "some-constant.<job_name>");
+		cfg.setString(MetricOptions.SCOPE_NAMING_JM, "abc");
+		cfg.setString(MetricOptions.SCOPE_NAMING_JM_JOB, "some-constant.<job_name>");
 		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
 
 		JobID jid = new JobID();
@@ -77,8 +77,8 @@ public class JobManagerJobGroupTest extends TestLogger {
 	@Test
 	public void testGenerateScopeCustomWildcard() {
 		Configuration cfg = new Configuration();
-		cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_JM, "peter");
-		cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_JM_JOB, "*.some-constant.<job_id>");
+		cfg.setString(MetricOptions.SCOPE_NAMING_JM, "peter");
+		cfg.setString(MetricOptions.SCOPE_NAMING_JM_JOB, "*.some-constant.<job_id>");
 		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
 
 		JobID jid = new JobID();

http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
index f121b59..ace0236 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Histogram;
@@ -41,7 +42,7 @@ public class MetricGroupRegistrationTest {
 	@Test
 	public void testMetricInstantiation() {
 		Configuration config = new Configuration();
-		config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test");
+		config.setString(MetricOptions.REPORTERS_LIST, "test");
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName());
 
 		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));

http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
index 965e8fb..11f3c0f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
@@ -19,8 +19,8 @@
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.metrics.MetricRegistry;
@@ -161,7 +161,7 @@ public class TaskManagerGroupTest extends TestLogger {
 	@Test
 	public void testGenerateScopeCustom() {
 		Configuration cfg = new Configuration();
-		cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "constant.<host>.foo.<host>");
+		cfg.setString(MetricOptions.SCOPE_NAMING_TM, "constant.<host>.foo.<host>");
 		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
 		TaskManagerMetricGroup group = new TaskManagerMetricGroup(registry, "host", "id");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
index f9bef6c..f9891cb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
@@ -19,8 +19,8 @@
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
@@ -54,8 +54,8 @@ public class TaskManagerJobGroupTest extends TestLogger {
 	@Test
 	public void testGenerateScopeCustom() {
 		Configuration cfg = new Configuration();
-		cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "abc");
-		cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM_JOB, "some-constant.<job_name>");
+		cfg.setString(MetricOptions.SCOPE_NAMING_TM, "abc");
+		cfg.setString(MetricOptions.SCOPE_NAMING_TM_JOB, "some-constant.<job_name>");
 		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
 
 		JobID jid = new JobID();
@@ -76,8 +76,8 @@ public class TaskManagerJobGroupTest extends TestLogger {
 	@Test
 	public void testGenerateScopeCustomWildcard() {
 		Configuration cfg = new Configuration();
-		cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "peter.<tm_id>");
-		cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM_JOB, "*.some-constant.<job_id>");
+		cfg.setString(MetricOptions.SCOPE_NAMING_TM, "peter.<tm_id>");
+		cfg.setString(MetricOptions.SCOPE_NAMING_TM_JOB, "*.some-constant.<job_id>");
 		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
 
 		JobID jid = new JobID();

http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
index eb906d0..183237a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
@@ -19,8 +19,8 @@
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
@@ -63,9 +63,9 @@ public class TaskMetricGroupTest extends TestLogger {
 	@Test
 	public void testGenerateScopeCustom() {
 		Configuration cfg = new Configuration();
-		cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "abc");
-		cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM_JOB, "def");
-		cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TASK, "<tm_id>.<job_id>.<task_id>.<task_attempt_id>");
+		cfg.setString(MetricOptions.SCOPE_NAMING_TM, "abc");
+		cfg.setString(MetricOptions.SCOPE_NAMING_TM_JOB, "def");
+		cfg.setString(MetricOptions.SCOPE_NAMING_TASK, "<tm_id>.<job_id>.<task_id>.<task_attempt_id>");
 		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
 
 		JobID jid = new JobID();
@@ -90,7 +90,7 @@ public class TaskMetricGroupTest extends TestLogger {
 	@Test
 	public void testGenerateScopeWilcard() {
 		Configuration cfg = new Configuration();
-		cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TASK, "*.<task_attempt_id>.<subtask_index>");
+		cfg.setString(MetricOptions.SCOPE_NAMING_TASK, "*.<task_attempt_id>.<subtask_index>");
 		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
 
 		AbstractID executionId = new AbstractID();

http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 7569170..99c3f4e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -28,8 +28,8 @@ import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
@@ -177,10 +177,10 @@ public abstract class AbstractStreamOperator<OUT>
 			((OperatorMetricGroup) this.metrics).getIOMetricGroup().reuseOutputMetricsForTask();
 		}
 		Configuration taskManagerConfig = container.getEnvironment().getTaskManagerInfo().getConfiguration();
-		int historySize = taskManagerConfig.getInteger(ConfigConstants.METRICS_LATENCY_HISTORY_SIZE, ConfigConstants.DEFAULT_METRICS_LATENCY_HISTORY_SIZE);
+		int historySize = taskManagerConfig.getInteger(MetricOptions.LATENCY_HISTORY_SIZE);
 		if (historySize <= 0) {
-			LOG.warn("{} has been set to a value equal or below 0: {}. Using default.", ConfigConstants.METRICS_LATENCY_HISTORY_SIZE, historySize);
-			historySize = ConfigConstants.DEFAULT_METRICS_LATENCY_HISTORY_SIZE;
+			LOG.warn("{} has been set to a value equal or below 0: {}. Using default.", MetricOptions.LATENCY_HISTORY_SIZE, historySize);
+			historySize = MetricOptions.LATENCY_HISTORY_SIZE.defaultValue();
 		}
 
 		latencyGauge = this.metrics.gauge("latency", new LatencyGauge(historySize));


Mime
View raw message