Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B75E2200C4E for ; Fri, 21 Apr 2017 14:24:18 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id B5BB9160B97; Fri, 21 Apr 2017 12:24:18 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B9FAE160BBC for ; Fri, 21 Apr 2017 14:24:16 +0200 (CEST) Received: (qmail 125 invoked by uid 500); 21 Apr 2017 12:24:15 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 99460 invoked by uid 99); 21 Apr 2017 12:24:15 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Apr 2017 12:24:15 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 53082F4A1D; Fri, 21 Apr 2017 12:24:15 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sewen@apache.org To: commits@flink.apache.org Date: Fri, 21 Apr 2017 12:24:23 -0000 Message-Id: <6c1f27a171404640b94fbc5e5b08cf4a@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [09/13] flink git commit: [FLINK-4769] [metrics] Port metric config parameters to ConfigOptions archived-at: Fri, 21 Apr 2017 12:24:18 -0000 [FLINK-4769] [metrics] Port metric config parameters to ConfigOptions This closes #3687 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1b77cc3f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1b77cc3f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1b77cc3f Branch: refs/heads/master Commit: 1b77cc3f2682afd839ce6b16cd1658bbd01a6b04 Parents: c0ea74a Author: zentol Authored: Thu Apr 6 13:47:33 2017 +0200 Committer: Stephan Ewen Committed: Fri Apr 21 12:31:30 2017 +0200 ---------------------------------------------------------------------- .../connectors/kafka/KafkaTestBase.java | 3 +- .../flink/configuration/ConfigConstants.java | 44 +++++----- .../flink/configuration/MetricOptions.java | 88 ++++++++++++++++++++ .../ScheduledDropwizardReporterTest.java | 7 +- .../DropwizardFlinkHistogramWrapperTest.java | 3 +- .../flink/metrics/jmx/JMXReporterTest.java | 9 +- .../jobmanager/JMXJobManagerMetricTest.java | 5 +- .../metrics/statsd/StatsDReporterTest.java | 11 +-- .../metrics/MetricRegistryConfiguration.java | 24 ++---- .../runtime/metrics/scope/ScopeFormat.java | 49 ----------- .../runtime/metrics/scope/ScopeFormats.java | 32 +++---- .../runtime/metrics/MetricRegistryTest.java | 29 +++---- .../metrics/groups/AbstractMetricGroupTest.java | 7 +- .../metrics/groups/JobManagerGroupTest.java | 4 +- .../metrics/groups/JobManagerJobGroupTest.java | 10 +-- .../groups/MetricGroupRegistrationTest.java | 3 +- .../metrics/groups/TaskManagerGroupTest.java | 4 +- .../metrics/groups/TaskManagerJobGroupTest.java | 10 +-- .../metrics/groups/TaskMetricGroupTest.java | 10 +-- .../api/operators/AbstractStreamOperator.java | 8 +- 20 files changed, 196 insertions(+), 164 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java index 0c6bfa9..a21a239 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MetricOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.metrics.jmx.JMXReporter; import org.apache.flink.runtime.client.JobExecutionException; @@ -116,7 +117,7 @@ public abstract class KafkaTestBase extends TestLogger { flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8); flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L); flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s"); - flinkConfig.setString(ConfigConstants.METRICS_REPORTERS_LIST, "my_reporter"); + flinkConfig.setString(MetricOptions.REPORTERS_LIST, "my_reporter"); flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName()); return flinkConfig; } http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index a035beb..61c1b27 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -990,21 +990,8 @@ public final class ConfigConstants { // ---------------------------- Metrics ----------------------------------- - /** - * The list of named reporters. Names are defined here and per-reporter configs - * are given with the reporter config prefix and the reporter name. - * - * Example: - *
{@code
-	 * metrics.reporters = foo, bar
-	 *
-	 * metrics.reporter.foo.class = org.apache.flink.metrics.reporter.JMXReporter
-	 * metrics.reporter.foo.interval = 10
-	 *
-	 * metrics.reporter.bar.class = org.apache.flink.metrics.graphite.GraphiteReporter
-	 * metrics.reporter.bar.port = 1337
-	 * }
- */ + /** @deprecated Use {@link MetricOptions#REPORTERS_LIST} instead. */ + @Deprecated public static final String METRICS_REPORTERS_LIST = "metrics.reporters"; /** @@ -1022,28 +1009,36 @@ public final class ConfigConstants { /** The delimiter used to assemble the metric identifier. This is used as a suffix in an actual reporter config. */ public static final String METRICS_REPORTER_SCOPE_DELIMITER = "scope.delimiter"; - /** The delimiter used to assemble the metric identifier. */ + /** @deprecated Use {@link MetricOptions#SCOPE_DELIMITER} instead. */ + @Deprecated public static final String METRICS_SCOPE_DELIMITER = "metrics.scope.delimiter"; - /** The scope format string that is applied to all metrics scoped to a JobManager. */ + /** @deprecated Use {@link MetricOptions#SCOPE_NAMING_JM} instead. */ + @Deprecated public static final String METRICS_SCOPE_NAMING_JM = "metrics.scope.jm"; - /** The scope format string that is applied to all metrics scoped to a TaskManager. */ + /** @deprecated Use {@link MetricOptions#SCOPE_NAMING_TM} instead. */ + @Deprecated public static final String METRICS_SCOPE_NAMING_TM = "metrics.scope.tm"; - /** The scope format string that is applied to all metrics scoped to a job on a JobManager. */ + /** @deprecated Use {@link MetricOptions#SCOPE_NAMING_JM_JOB} instead. */ + @Deprecated public static final String METRICS_SCOPE_NAMING_JM_JOB = "metrics.scope.jm.job"; - /** The scope format string that is applied to all metrics scoped to a job on a TaskManager. */ + /** @deprecated Use {@link MetricOptions#SCOPE_NAMING_TM_JOB} instead. */ + @Deprecated public static final String METRICS_SCOPE_NAMING_TM_JOB = "metrics.scope.tm.job"; - /** The scope format string that is applied to all metrics scoped to a task. */ + /** @deprecated Use {@link MetricOptions#SCOPE_NAMING_TASK} instead. */ + @Deprecated public static final String METRICS_SCOPE_NAMING_TASK = "metrics.scope.task"; - /** The scope format string that is applied to all metrics scoped to an operator. */ + /** @deprecated Use {@link MetricOptions#SCOPE_NAMING_OPERATOR} instead. */ + @Deprecated public static final String METRICS_SCOPE_NAMING_OPERATOR = "metrics.scope.operator"; - /** The number of measured latencies to maintain at each operator */ + /** @deprecated Use {@link MetricOptions#LATENCY_HISTORY_SIZE} instead. */ + @Deprecated public static final String METRICS_LATENCY_HISTORY_SIZE = "metrics.latency.history-size"; @@ -1498,7 +1493,8 @@ public final class ConfigConstants { // ----------------------------- Metrics ---------------------------- - /** The default number of measured latencies to maintain at each operator */ + /** @deprecated Use {@link MetricOptions#LATENCY_HISTORY_SIZE} instead. */ + @Deprecated public static final int DEFAULT_METRICS_LATENCY_HISTORY_SIZE = 128; // ----------------------------- Environment Variables ---------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java new file mode 100644 index 0000000..8a328ac --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.configuration; + +import org.apache.flink.annotation.PublicEvolving; + +import static org.apache.flink.configuration.ConfigOptions.key; + +@PublicEvolving +public class MetricOptions { + + /** + * The list of named reporters. Names are defined here and per-reporter configs + * are given with the reporter config prefix and the reporter name. + * + * Example: + *
{@code
+	 * metrics.reporters = foo, bar
+	 *
+	 * metrics.reporter.foo.class = org.apache.flink.metrics.reporter.JMXReporter
+	 * metrics.reporter.foo.interval = 10
+	 *
+	 * metrics.reporter.bar.class = org.apache.flink.metrics.graphite.GraphiteReporter
+	 * metrics.reporter.bar.port = 1337
+	 * }
+ */ + public static final ConfigOption REPORTERS_LIST = + key("metrics.reporters") + .noDefaultValue(); + + /** The delimiter used to assemble the metric identifier. */ + public static final ConfigOption SCOPE_DELIMITER = + key("metrics.scope.delimiter") + .defaultValue("."); + + /** The scope format string that is applied to all metrics scoped to a JobManager. */ + public static final ConfigOption SCOPE_NAMING_JM = + key("metrics.scope.jm") + .defaultValue(".jobmanager"); + + /** The scope format string that is applied to all metrics scoped to a TaskManager. */ + public static final ConfigOption SCOPE_NAMING_TM = + key("metrics.scope.tm") + .defaultValue(".taskmanager."); + + /** The scope format string that is applied to all metrics scoped to a job on a JobManager. */ + public static final ConfigOption SCOPE_NAMING_JM_JOB = + key("metrics.scope.jm.job") + .defaultValue(".jobmanager."); + + /** The scope format string that is applied to all metrics scoped to a job on a TaskManager. */ + public static final ConfigOption SCOPE_NAMING_TM_JOB = + key("metrics.scope.tm.job") + .defaultValue(".taskmanager.."); + + /** The scope format string that is applied to all metrics scoped to a task. */ + public static final ConfigOption SCOPE_NAMING_TASK = + key("metrics.scope.task") + .defaultValue(".taskmanager...."); + + /** The scope format string that is applied to all metrics scoped to an operator. */ + public static final ConfigOption SCOPE_NAMING_OPERATOR = + key("metrics.scope.operator") + .defaultValue(".taskmanager...."); + + /** The number of measured latencies to maintain at each operator */ + public static final ConfigOption LATENCY_HISTORY_SIZE = + key("metrics.latency.history-size") + .defaultValue(128); + + private MetricOptions() { + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java index 87c4ccb..73e7f0b 100644 --- a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java +++ b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java @@ -22,6 +22,7 @@ import com.codahale.metrics.ScheduledReporter; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MetricOptions; import org.apache.flink.dropwizard.metrics.DropwizardMeterWrapper; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; @@ -76,13 +77,13 @@ public class ScheduledDropwizardReporterTest { String taskManagerId = "tas:kMana::ger"; String counterName = "testCounter"; - configuration.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test"); + configuration.setString(MetricOptions.REPORTERS_LIST, "test"); configuration.setString( ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, "org.apache.flink.dropwizard.ScheduledDropwizardReporterTest$TestingScheduledDropwizardReporter"); - configuration.setString(ConfigConstants.METRICS_SCOPE_NAMING_TASK, ".."); - configuration.setString(ConfigConstants.METRICS_SCOPE_DELIMITER, "_"); + configuration.setString(MetricOptions.SCOPE_NAMING_TASK, ".."); + configuration.setString(MetricOptions.SCOPE_DELIMITER, "_"); MetricRegistryConfiguration metricRegistryConfiguration = MetricRegistryConfiguration.fromConfiguration(configuration); http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java index 2f2a536..8f7796c 100644 --- a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java +++ b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java @@ -28,6 +28,7 @@ import com.codahale.metrics.Snapshot; import com.codahale.metrics.Timer; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MetricOptions; import org.apache.flink.dropwizard.ScheduledDropwizardReporter; import org.apache.flink.metrics.MetricConfig; import org.apache.flink.metrics.reporter.MetricReporter; @@ -96,7 +97,7 @@ public class DropwizardFlinkHistogramWrapperTest extends TestLogger { int size = 10; String histogramMetricName = "histogram"; Configuration config = new Configuration(); - config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "my_reporter"); + config.setString(MetricOptions.REPORTERS_LIST, "my_reporter"); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestingReporter.class.getName()); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, reportingInterval + " MILLISECONDS"); http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java index 1a96287..85ab897 100644 --- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java +++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java @@ -20,6 +20,7 @@ package org.apache.flink.metrics.jmx; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MetricOptions; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.util.TestMeter; import org.apache.flink.metrics.reporter.MetricReporter; @@ -94,7 +95,7 @@ public class JMXReporterTest extends TestLogger { @Test public void testPortConflictHandling() throws Exception { Configuration cfg = new Configuration(); - cfg.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test1,test2"); + cfg.setString(MetricOptions.REPORTERS_LIST, "test1,test2"); cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName()); cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1.port", "9020-9035"); @@ -154,7 +155,7 @@ public class JMXReporterTest extends TestLogger { Configuration cfg = new Configuration(); cfg.setString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter.class.getName()); - cfg.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test1,test2"); + cfg.setString(MetricOptions.REPORTERS_LIST, "test1,test2"); cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName()); cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1.port", "9040-9055"); @@ -230,7 +231,7 @@ public class JMXReporterTest extends TestLogger { try { Configuration config = new Configuration(); - config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "jmx_test"); + config.setString(MetricOptions.REPORTERS_LIST, "jmx_test"); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "jmx_test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName()); registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config)); @@ -280,7 +281,7 @@ public class JMXReporterTest extends TestLogger { try { Configuration config = new Configuration(); - config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "jmx_test"); + config.setString(MetricOptions.REPORTERS_LIST, "jmx_test"); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "jmx_test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName()); registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config)); http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java index 934a621..6d54db3 100644 --- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java +++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.jobmanager; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MetricOptions; import org.apache.flink.metrics.jmx.JMXReporter; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; @@ -55,11 +56,11 @@ public class JMXJobManagerMetricTest { Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow(); Configuration flinkConfiguration = new Configuration(); - flinkConfiguration.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test"); + flinkConfiguration.setString(MetricOptions.REPORTERS_LIST, "test"); flinkConfiguration.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName()); flinkConfiguration.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.port", "9060-9075"); - flinkConfiguration.setString(ConfigConstants.METRICS_SCOPE_NAMING_JM_JOB, "jobmanager."); + flinkConfiguration.setString(MetricOptions.SCOPE_NAMING_JM_JOB, "jobmanager."); TestingCluster flink = new TestingCluster(flinkConfiguration); http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java index 17fd65a..9215c3f 100644 --- a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java +++ b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java @@ -21,6 +21,7 @@ package org.apache.flink.metrics.statsd; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MetricOptions; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Histogram; import org.apache.flink.metrics.HistogramStatistics; @@ -75,13 +76,13 @@ public class StatsDReporterTest extends TestLogger { String taskManagerId = "tas:kMana::ger"; String counterName = "testCounter"; - configuration.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test"); + configuration.setString(MetricOptions.REPORTERS_LIST, "test"); configuration.setString( ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, "org.apache.flink.metrics.statsd.StatsDReporterTest$TestingStatsDReporter"); - configuration.setString(ConfigConstants.METRICS_SCOPE_NAMING_TASK, ".."); - configuration.setString(ConfigConstants.METRICS_SCOPE_DELIMITER, "_"); + configuration.setString(MetricOptions.SCOPE_NAMING_TASK, ".."); + configuration.setString(MetricOptions.SCOPE_DELIMITER, "_"); MetricRegistry metricRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(configuration)); @@ -145,7 +146,7 @@ public class StatsDReporterTest extends TestLogger { int port = receiver.getPort(); Configuration config = new Configuration(); - config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test"); + config.setString(MetricOptions.REPORTERS_LIST, "test"); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, StatsDReporter.class.getName()); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "1 SECONDS"); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.host", "localhost"); @@ -219,7 +220,7 @@ public class StatsDReporterTest extends TestLogger { int port = receiver.getPort(); Configuration config = new Configuration(); - config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test"); + config.setString(MetricOptions.REPORTERS_LIST, "test"); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, StatsDReporter.class.getName()); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "1 SECONDS"); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.host", "localhost"); http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java index 596fc82..3475f04 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java @@ -22,7 +22,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.DelegatingConfiguration; -import org.apache.flink.runtime.metrics.scope.ScopeFormat; +import org.apache.flink.configuration.MetricOptions; import org.apache.flink.runtime.metrics.scope.ScopeFormats; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; @@ -101,13 +101,13 @@ public class MetricRegistryConfiguration { char delim; try { - delim = configuration.getString(ConfigConstants.METRICS_SCOPE_DELIMITER, ".").charAt(0); + delim = configuration.getString(MetricOptions.SCOPE_DELIMITER).charAt(0); } catch (Exception e) { LOG.warn("Failed to parse delimiter, using default delimiter.", e); delim = '.'; } - final String definedReporters = configuration.getString(ConfigConstants.METRICS_REPORTERS_LIST, null); + final String definedReporters = configuration.getString(MetricOptions.REPORTERS_LIST); List> reporterConfigurations; if (definedReporters == null) { @@ -136,18 +136,12 @@ public class MetricRegistryConfiguration { * @return Scope formats extracted from the given configuration */ static ScopeFormats createScopeConfig(Configuration configuration) { - String jmFormat = configuration.getString( - ConfigConstants.METRICS_SCOPE_NAMING_JM, ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_GROUP); - String jmJobFormat = configuration.getString( - ConfigConstants.METRICS_SCOPE_NAMING_JM_JOB, ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_JOB_GROUP); - String tmFormat = configuration.getString( - ConfigConstants.METRICS_SCOPE_NAMING_TM, ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_GROUP); - String tmJobFormat = configuration.getString( - ConfigConstants.METRICS_SCOPE_NAMING_TM_JOB, ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP); - String taskFormat = configuration.getString( - ConfigConstants.METRICS_SCOPE_NAMING_TASK, ScopeFormat.DEFAULT_SCOPE_TASK_GROUP); - String operatorFormat = configuration.getString( - ConfigConstants.METRICS_SCOPE_NAMING_OPERATOR, ScopeFormat.DEFAULT_SCOPE_OPERATOR_GROUP); + String jmFormat = configuration.getString(MetricOptions.SCOPE_NAMING_JM); + String jmJobFormat = configuration.getString(MetricOptions.SCOPE_NAMING_JM_JOB); + String tmFormat = configuration.getString(MetricOptions.SCOPE_NAMING_TM); + String tmJobFormat = configuration.getString(MetricOptions.SCOPE_NAMING_TM_JOB); + String taskFormat = configuration.getString(MetricOptions.SCOPE_NAMING_TASK); + String operatorFormat = configuration.getString(MetricOptions.SCOPE_NAMING_OPERATOR); return new ScopeFormats(jmFormat, jmJobFormat, tmFormat, tmJobFormat, taskFormat, operatorFormat); } http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java index adf0a85..f9efb88 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java @@ -70,46 +70,15 @@ public abstract class ScopeFormat { public static final String SCOPE_HOST = asVariable("host"); - // ----- Job Manager ---- - - /** The default scope format of the JobManager component: {@code ".jobmanager"} */ - public static final String DEFAULT_SCOPE_JOBMANAGER_COMPONENT = - concat(SCOPE_HOST, "jobmanager"); - - /** The default scope format of JobManager metrics: {@code ".jobmanager"} */ - public static final String DEFAULT_SCOPE_JOBMANAGER_GROUP = DEFAULT_SCOPE_JOBMANAGER_COMPONENT; - // ----- Task Manager ---- public static final String SCOPE_TASKMANAGER_ID = asVariable("tm_id"); - /** The default scope format of the TaskManager component: {@code ".taskmanager."} */ - public static final String DEFAULT_SCOPE_TASKMANAGER_COMPONENT = - concat(SCOPE_HOST, "taskmanager", SCOPE_TASKMANAGER_ID); - - /** The default scope format of TaskManager metrics: {@code ".taskmanager."} */ - public static final String DEFAULT_SCOPE_TASKMANAGER_GROUP = DEFAULT_SCOPE_TASKMANAGER_COMPONENT; - // ----- Job ----- public static final String SCOPE_JOB_ID = asVariable("job_id"); public static final String SCOPE_JOB_NAME = asVariable("job_name"); - /** The default scope format for the job component: {@code ""} */ - public static final String DEFAULT_SCOPE_JOB_COMPONENT = SCOPE_JOB_NAME; - - // ----- Job on Job Manager ---- - - /** The default scope format for all job metrics on a jobmanager: {@code ".jobmanager."} */ - public static final String DEFAULT_SCOPE_JOBMANAGER_JOB_GROUP = - concat(DEFAULT_SCOPE_JOBMANAGER_COMPONENT, DEFAULT_SCOPE_JOB_COMPONENT); - - // ----- Job on Task Manager ---- - - /** The default scope format for all job metrics on a taskmanager: {@code ".taskmanager.."} */ - public static final String DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP = - concat(DEFAULT_SCOPE_TASKMANAGER_COMPONENT, DEFAULT_SCOPE_JOB_COMPONENT); - // ----- Task ---- public static final String SCOPE_TASK_VERTEX_ID = asVariable("task_id"); @@ -118,27 +87,9 @@ public abstract class ScopeFormat { public static final String SCOPE_TASK_ATTEMPT_NUM = asVariable("task_attempt_num"); public static final String SCOPE_TASK_SUBTASK_INDEX = asVariable("subtask_index"); - /** Default scope of the task component: {@code "."} */ - public static final String DEFAULT_SCOPE_TASK_COMPONENT = - concat(SCOPE_TASK_NAME, SCOPE_TASK_SUBTASK_INDEX); - - /** The default scope format for all task metrics: - * {@code ".taskmanager...."} */ - public static final String DEFAULT_SCOPE_TASK_GROUP = - concat(DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP, DEFAULT_SCOPE_TASK_COMPONENT); - // ----- Operator ---- public static final String SCOPE_OPERATOR_NAME = asVariable("operator_name"); - - /** The default scope added by the operator component: "." */ - public static final String DEFAULT_SCOPE_OPERATOR_COMPONENT = - concat(SCOPE_OPERATOR_NAME, SCOPE_TASK_SUBTASK_INDEX); - - /** The default scope format for all operator metrics: - * {@code ".taskmanager...."} */ - public static final String DEFAULT_SCOPE_OPERATOR_GROUP = - concat(DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP, DEFAULT_SCOPE_OPERATOR_COMPONENT); // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormats.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormats.java index bbbe6ba..bde93be 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormats.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormats.java @@ -18,8 +18,8 @@ package org.apache.flink.runtime.metrics.scope; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MetricOptions; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -41,21 +41,21 @@ public class ScopeFormats { * Creates all default scope formats. */ public ScopeFormats() { - this.jobManagerFormat = new JobManagerScopeFormat(ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_COMPONENT); + this.jobManagerFormat = new JobManagerScopeFormat(MetricOptions.SCOPE_NAMING_JM.defaultValue()); this.jobManagerJobFormat = new JobManagerJobScopeFormat( - ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_JOB_GROUP, this.jobManagerFormat); + MetricOptions.SCOPE_NAMING_JM_JOB.defaultValue(), this.jobManagerFormat); - this.taskManagerFormat = new TaskManagerScopeFormat(ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_COMPONENT); + this.taskManagerFormat = new TaskManagerScopeFormat(MetricOptions.SCOPE_NAMING_TM.defaultValue()); this.taskManagerJobFormat = new TaskManagerJobScopeFormat( - ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP, this.taskManagerFormat); + MetricOptions.SCOPE_NAMING_TM_JOB.defaultValue(), this.taskManagerFormat); this.taskFormat = new TaskScopeFormat( - ScopeFormat.DEFAULT_SCOPE_TASK_GROUP, this.taskManagerJobFormat); + MetricOptions.SCOPE_NAMING_TASK.defaultValue(), this.taskManagerJobFormat); this.operatorFormat = new OperatorScopeFormat( - ScopeFormat.DEFAULT_SCOPE_OPERATOR_GROUP, this.taskFormat); + MetricOptions.SCOPE_NAMING_OPERATOR.defaultValue(), this.taskFormat); } /** @@ -135,18 +135,12 @@ public class ScopeFormats { * @return The ScopeFormats parsed from the configuration */ public static ScopeFormats fromConfig(Configuration config) { - String jmFormat = config.getString( - ConfigConstants.METRICS_SCOPE_NAMING_JM, ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_GROUP); - String jmJobFormat = config.getString( - ConfigConstants.METRICS_SCOPE_NAMING_JM_JOB, ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_JOB_GROUP); - String tmFormat = config.getString( - ConfigConstants.METRICS_SCOPE_NAMING_TM, ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_GROUP); - String tmJobFormat = config.getString( - ConfigConstants.METRICS_SCOPE_NAMING_TM_JOB, ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP); - String taskFormat = config.getString( - ConfigConstants.METRICS_SCOPE_NAMING_TASK, ScopeFormat.DEFAULT_SCOPE_TASK_GROUP); - String operatorFormat = config.getString( - ConfigConstants.METRICS_SCOPE_NAMING_OPERATOR, ScopeFormat.DEFAULT_SCOPE_OPERATOR_GROUP); + String jmFormat = config.getString(MetricOptions.SCOPE_NAMING_JM); + String jmJobFormat = config.getString(MetricOptions.SCOPE_NAMING_JM_JOB); + String tmFormat = config.getString(MetricOptions.SCOPE_NAMING_TM); + String tmJobFormat = config.getString(MetricOptions.SCOPE_NAMING_TM_JOB); + String taskFormat = config.getString(MetricOptions.SCOPE_NAMING_TASK); + String operatorFormat = config.getString(MetricOptions.SCOPE_NAMING_OPERATOR); return new ScopeFormats(jmFormat, jmJobFormat, tmFormat, tmJobFormat, taskFormat, operatorFormat); } http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java index fe29ccb..b9502b2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java @@ -23,6 +23,7 @@ import akka.actor.ActorRef; import akka.actor.ActorSystem; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MetricOptions; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.MetricConfig; @@ -69,7 +70,7 @@ public class MetricRegistryTest extends TestLogger { public void testReporterInstantiation() { Configuration config = new Configuration(); - config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test"); + config.setString(MetricOptions.REPORTERS_LIST, "test"); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName()); MetricRegistry metricRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config)); @@ -97,7 +98,7 @@ public class MetricRegistryTest extends TestLogger { public void testMultipleReporterInstantiation() { Configuration config = new Configuration(); - config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test1, test2,test3"); + config.setString(MetricOptions.REPORTERS_LIST, "test1, test2,test3"); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter11.class.getName()); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter12.class.getName()); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test3." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter13.class.getName()); @@ -147,7 +148,7 @@ public class MetricRegistryTest extends TestLogger { public void testReporterArgumentForwarding() { Configuration config = new Configuration(); - config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test"); + config.setString(MetricOptions.REPORTERS_LIST, "test"); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter2.class.getName()); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg1", "hello"); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg2", "world"); @@ -172,7 +173,7 @@ public class MetricRegistryTest extends TestLogger { public void testReporterScheduling() throws InterruptedException { Configuration config = new Configuration(); - config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test"); + config.setString(MetricOptions.REPORTERS_LIST, "test"); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter3.class.getName()); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg1", "hello"); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "50 MILLISECONDS"); @@ -215,7 +216,7 @@ public class MetricRegistryTest extends TestLogger { @Test public void testReporterNotifications() { Configuration config = new Configuration(); - config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test1,test2"); + config.setString(MetricOptions.REPORTERS_LIST, "test1,test2"); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter6.class.getName()); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter7.class.getName()); @@ -278,10 +279,10 @@ public class MetricRegistryTest extends TestLogger { public void testScopeConfig() { Configuration config = new Configuration(); - config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "A"); - config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM_JOB, "B"); - config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TASK, "C"); - config.setString(ConfigConstants.METRICS_SCOPE_NAMING_OPERATOR, "D"); + config.setString(MetricOptions.SCOPE_NAMING_TM, "A"); + config.setString(MetricOptions.SCOPE_NAMING_TM_JOB, "B"); + config.setString(MetricOptions.SCOPE_NAMING_TASK, "C"); + config.setString(MetricOptions.SCOPE_NAMING_OPERATOR, "D"); ScopeFormats scopeConfig = MetricRegistryConfiguration.createScopeConfig(config); @@ -294,8 +295,8 @@ public class MetricRegistryTest extends TestLogger { @Test public void testConfigurableDelimiter() { Configuration config = new Configuration(); - config.setString(ConfigConstants.METRICS_SCOPE_DELIMITER, "_"); - config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "A.B.C.D.E"); + config.setString(MetricOptions.SCOPE_DELIMITER, "_"); + config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B.C.D.E"); MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config)); @@ -308,7 +309,7 @@ public class MetricRegistryTest extends TestLogger { @Test public void testConfigurableDelimiterForReporters() { Configuration config = new Configuration(); - config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test1,test2,test3"); + config.setString(MetricOptions.REPORTERS_LIST, "test1,test2,test3"); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "_"); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter.class.getName()); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-"); @@ -331,7 +332,7 @@ public class MetricRegistryTest extends TestLogger { @Test public void testConfigurableDelimiterForReportersInGroup() { Configuration config = new Configuration(); - config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test1,test2,test3,test4"); + config.setString(MetricOptions.REPORTERS_LIST, "test1,test2,test3,test4"); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "_"); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter8.class.getName()); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-"); @@ -339,7 +340,7 @@ public class MetricRegistryTest extends TestLogger { config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test3." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "AA"); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test3." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter8.class.getName()); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test4." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter8.class.getName()); - config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "A.B"); + config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B"); MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config)); List reporters = registry.getReporters(); http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java index da14bfd..04e40ae 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.metrics.groups; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MetricOptions; import org.apache.flink.metrics.CharacterFilter; import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.MetricGroup; @@ -77,8 +78,8 @@ public class AbstractMetricGroupTest { @Test public void testScopeCachingForMultipleReporters() throws Exception { Configuration config = new Configuration(); - config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "A.B.C.D"); - config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test1,test2"); + config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B.C.D"); + config.setString(MetricOptions.REPORTERS_LIST, "test1,test2"); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName()); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-"); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter2.class.getName()); @@ -166,7 +167,7 @@ public class AbstractMetricGroupTest { @Test public void testScopeGenerationWithoutReporters() { Configuration config = new Configuration(); - config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "A.B.C.D"); + config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B.C.D"); MetricRegistry testRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config)); try { http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java index 317d19e..7834755 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java @@ -18,8 +18,8 @@ package org.apache.flink.runtime.metrics.groups; import org.apache.flink.api.common.JobID; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MetricOptions; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; @@ -110,7 +110,7 @@ public class JobManagerGroupTest extends TestLogger { @Test public void testGenerateScopeCustom() { Configuration cfg = new Configuration(); - cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_JM, "constant..foo."); + cfg.setString(MetricOptions.SCOPE_NAMING_JM, "constant..foo."); MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg)); JobManagerMetricGroup group = new JobManagerMetricGroup(registry, "host"); http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java index ed4056f..d8bd57a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java @@ -19,8 +19,8 @@ package org.apache.flink.runtime.metrics.groups; import org.apache.flink.api.common.JobID; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MetricOptions; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; @@ -54,8 +54,8 @@ public class JobManagerJobGroupTest extends TestLogger { @Test public void testGenerateScopeCustom() { Configuration cfg = new Configuration(); - cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_JM, "abc"); - cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_JM_JOB, "some-constant."); + cfg.setString(MetricOptions.SCOPE_NAMING_JM, "abc"); + cfg.setString(MetricOptions.SCOPE_NAMING_JM_JOB, "some-constant."); MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg)); JobID jid = new JobID(); @@ -77,8 +77,8 @@ public class JobManagerJobGroupTest extends TestLogger { @Test public void testGenerateScopeCustomWildcard() { Configuration cfg = new Configuration(); - cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_JM, "peter"); - cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_JM_JOB, "*.some-constant."); + cfg.setString(MetricOptions.SCOPE_NAMING_JM, "peter"); + cfg.setString(MetricOptions.SCOPE_NAMING_JM_JOB, "*.some-constant."); MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg)); JobID jid = new JobID(); http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java index f121b59..ace0236 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.metrics.groups; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MetricOptions; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.Histogram; @@ -41,7 +42,7 @@ public class MetricGroupRegistrationTest { @Test public void testMetricInstantiation() { Configuration config = new Configuration(); - config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test"); + config.setString(MetricOptions.REPORTERS_LIST, "test"); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName()); MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config)); http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java index 965e8fb..11f3c0f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java @@ -19,8 +19,8 @@ package org.apache.flink.runtime.metrics.groups; import org.apache.flink.api.common.JobID; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MetricOptions; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.metrics.MetricRegistry; @@ -161,7 +161,7 @@ public class TaskManagerGroupTest extends TestLogger { @Test public void testGenerateScopeCustom() { Configuration cfg = new Configuration(); - cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "constant..foo."); + cfg.setString(MetricOptions.SCOPE_NAMING_TM, "constant..foo."); MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg)); TaskManagerMetricGroup group = new TaskManagerMetricGroup(registry, "host", "id"); http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java index f9bef6c..f9891cb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java @@ -19,8 +19,8 @@ package org.apache.flink.runtime.metrics.groups; import org.apache.flink.api.common.JobID; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MetricOptions; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; @@ -54,8 +54,8 @@ public class TaskManagerJobGroupTest extends TestLogger { @Test public void testGenerateScopeCustom() { Configuration cfg = new Configuration(); - cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "abc"); - cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM_JOB, "some-constant."); + cfg.setString(MetricOptions.SCOPE_NAMING_TM, "abc"); + cfg.setString(MetricOptions.SCOPE_NAMING_TM_JOB, "some-constant."); MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg)); JobID jid = new JobID(); @@ -76,8 +76,8 @@ public class TaskManagerJobGroupTest extends TestLogger { @Test public void testGenerateScopeCustomWildcard() { Configuration cfg = new Configuration(); - cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "peter."); - cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM_JOB, "*.some-constant."); + cfg.setString(MetricOptions.SCOPE_NAMING_TM, "peter."); + cfg.setString(MetricOptions.SCOPE_NAMING_TM_JOB, "*.some-constant."); MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg)); JobID jid = new JobID(); http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java index eb906d0..183237a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java @@ -19,8 +19,8 @@ package org.apache.flink.runtime.metrics.groups; import org.apache.flink.api.common.JobID; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MetricOptions; import org.apache.flink.metrics.Metric; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; @@ -63,9 +63,9 @@ public class TaskMetricGroupTest extends TestLogger { @Test public void testGenerateScopeCustom() { Configuration cfg = new Configuration(); - cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "abc"); - cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM_JOB, "def"); - cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TASK, "..."); + cfg.setString(MetricOptions.SCOPE_NAMING_TM, "abc"); + cfg.setString(MetricOptions.SCOPE_NAMING_TM_JOB, "def"); + cfg.setString(MetricOptions.SCOPE_NAMING_TASK, "..."); MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg)); JobID jid = new JobID(); @@ -90,7 +90,7 @@ public class TaskMetricGroupTest extends TestLogger { @Test public void testGenerateScopeWilcard() { Configuration cfg = new Configuration(); - cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TASK, "*.."); + cfg.setString(MetricOptions.SCOPE_NAMING_TASK, "*.."); MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg)); AbstractID executionId = new AbstractID(); http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index 7569170..99c3f4e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -28,8 +28,8 @@ import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MetricOptions; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; @@ -177,10 +177,10 @@ public abstract class AbstractStreamOperator ((OperatorMetricGroup) this.metrics).getIOMetricGroup().reuseOutputMetricsForTask(); } Configuration taskManagerConfig = container.getEnvironment().getTaskManagerInfo().getConfiguration(); - int historySize = taskManagerConfig.getInteger(ConfigConstants.METRICS_LATENCY_HISTORY_SIZE, ConfigConstants.DEFAULT_METRICS_LATENCY_HISTORY_SIZE); + int historySize = taskManagerConfig.getInteger(MetricOptions.LATENCY_HISTORY_SIZE); if (historySize <= 0) { - LOG.warn("{} has been set to a value equal or below 0: {}. Using default.", ConfigConstants.METRICS_LATENCY_HISTORY_SIZE, historySize); - historySize = ConfigConstants.DEFAULT_METRICS_LATENCY_HISTORY_SIZE; + LOG.warn("{} has been set to a value equal or below 0: {}. Using default.", MetricOptions.LATENCY_HISTORY_SIZE, historySize); + historySize = MetricOptions.LATENCY_HISTORY_SIZE.defaultValue(); } latencyGauge = this.metrics.gauge("latency", new LatencyGauge(historySize));